001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.runtime.stream; 020 021import java.nio.file.Path; 022import java.nio.file.Paths; 023import java.time.Duration; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Objects; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.common.Environment; 031import org.nuxeo.lib.stream.computation.Settings; 032import org.nuxeo.lib.stream.computation.StreamProcessor; 033import org.nuxeo.lib.stream.computation.Topology; 034import org.nuxeo.lib.stream.computation.log.LogStreamProcessor; 035import org.nuxeo.lib.stream.log.LogManager; 036import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager; 037import org.nuxeo.lib.stream.log.kafka.KafkaLogManager; 038import org.nuxeo.runtime.api.Framework; 039import org.nuxeo.runtime.codec.CodecService; 040import org.nuxeo.runtime.kafka.KafkaConfigService; 041import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl; 042import org.nuxeo.runtime.model.ComponentContext; 043import org.nuxeo.runtime.model.ComponentInstance; 044import org.nuxeo.runtime.model.ComponentManager; 045import org.nuxeo.runtime.model.DefaultComponent; 046 047/** 048 * @since 9.3 049 */ 050public class StreamServiceImpl extends DefaultComponent implements StreamService { 051 public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir"; 052 053 public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration"; 054 055 protected static final String LOG_CONFIG_XP = "logConfig"; 056 057 protected static final String STREAM_PROCESSOR_XP = "streamProcessor"; 058 059 private static final Log log = LogFactory.getLog(StreamServiceImpl.class); 060 061 protected final Map<String, LogConfigDescriptor> configs = new HashMap<>(); 062 063 protected final Map<String, LogManager> managers = new HashMap<>(); 064 065 protected final Map<String, StreamProcessor> processors = new HashMap<>(); 066 067 protected final Map<String, StreamProcessorDescriptor> processorDescriptors = new HashMap<>(); 068 069 @Override 070 public int getApplicationStartedOrder() { 071 // start after kafka config service 072 return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER + 10; 073 } 074 075 @Override 076 public LogManager getLogManager(String name) { 077 // TODO: returns a wrapper that don't expose the LogManager#close 078 if (!managers.containsKey(name)) { 079 if (!configs.containsKey(name)) { 080 throw new IllegalArgumentException("Unknown logConfig: " + name); 081 } 082 LogConfigDescriptor config = configs.get(name); 083 if (config.isKafkaLog()) { 084 managers.put(name, createKafkaLogManager(config)); 085 } else { 086 managers.put(name, createChronicleLogManager(config)); 087 } 088 } 089 return managers.get(name); 090 } 091 092 protected LogManager createKafkaLogManager(LogConfigDescriptor config) { 093 String kafkaConfig = config.getOption("kafkaConfig", "default"); 094 KafkaConfigService service = Framework.getService(KafkaConfigService.class); 095 return new KafkaLogManager(service.getTopicPrefix(kafkaConfig), service.getProducerProperties(kafkaConfig), 096 service.getConsumerProperties(kafkaConfig)); 097 } 098 099 protected LogManager createChronicleLogManager(LogConfigDescriptor config) { 100 String basePath = config.getOption("basePath", null); 101 String directory = config.getOption("directory", config.getName()); 102 Path path = getChroniclePath(basePath, directory); 103 String retention = getChronicleRetention(config.getOption("retention", null)); 104 return new ChronicleLogManager(path, retention); 105 } 106 107 protected String getChronicleRetention(String retention) { 108 return retention != null ? retention : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d"); 109 } 110 111 protected Path getChroniclePath(String basePath, String name) { 112 if (basePath != null) { 113 return Paths.get(basePath, name).toAbsolutePath(); 114 } 115 basePath = Framework.getProperty(NUXEO_STREAM_DIR_PROP); 116 if (basePath != null) { 117 return Paths.get(basePath, name).toAbsolutePath(); 118 } 119 basePath = Framework.getProperty(Environment.NUXEO_DATA_DIR); 120 if (basePath != null) { 121 return Paths.get(basePath, "stream", name).toAbsolutePath(); 122 } 123 return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream", name).toAbsolutePath(); 124 } 125 126 protected void createStreamIfNotExists(String name, LogConfigDescriptor config) { 127 if (config.getLogsToCreate().isEmpty()) { 128 return; 129 } 130 LogManager manager = getLogManager(name); 131 config.getLogsToCreate().forEach((stream, size) -> { 132 log.info("Create if not exists stream: " + stream + " with manager: " + name); 133 manager.createIfNotExists(stream, size); 134 }); 135 } 136 137 @Override 138 public void start(ComponentContext context) { 139 super.start(context); 140 configs.forEach(this::createStreamIfNotExists); 141 processorDescriptors.forEach(this::initProcessor); 142 new ComponentsLifeCycleListener().install(); 143 } 144 145 protected void initProcessor(String name, StreamProcessorDescriptor descriptor) { 146 if (processors.containsKey(name)) { 147 log.error("Processor already initialized: " + name); 148 return; 149 } 150 log.info("Init Stream processor: " + name + " with manager: " + descriptor.config); 151 LogManager manager = getLogManager(descriptor.config); 152 Topology topology = descriptor.getTopology(); 153 StreamProcessor streamProcessor = new LogStreamProcessor(manager); 154 Settings settings = descriptor.getSettings(Framework.getService(CodecService.class)); 155 if (log.isDebugEnabled()) { 156 log.debug("Starting computation topology: " + name + "\n" + topology.toPlantuml(settings)); 157 } 158 streamProcessor.init(topology, settings); 159 processors.put(name, streamProcessor); 160 } 161 162 @Override 163 public void stop(ComponentContext context) throws InterruptedException { 164 super.stop(context); 165 stopComputations(); // should have already be done by the beforeStop listener 166 closeLogManagers(); 167 } 168 169 protected void startComputations() { 170 processorDescriptors.keySet().forEach(name -> { 171 StreamProcessor manager = processors.get(name); 172 if (manager != null) { 173 manager.start(); 174 } 175 }); 176 } 177 178 protected void stopComputations() { 179 processors.forEach((name, manager) -> manager.stop(Duration.ofSeconds(1))); 180 processors.clear(); 181 } 182 183 protected void closeLogManagers() { 184 managers.values().stream().filter(Objects::nonNull).forEach(LogManager::close); 185 managers.clear(); 186 } 187 188 @Override 189 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 190 if (extensionPoint.equals(LOG_CONFIG_XP)) { 191 LogConfigDescriptor descriptor = (LogConfigDescriptor) contribution; 192 configs.put(descriptor.name, descriptor); 193 log.debug(String.format("Register logConfig: %s", descriptor.name)); 194 } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) { 195 StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor) contribution; 196 processorDescriptors.put(descriptor.name, descriptor); 197 log.debug(String.format("Register Stream StreamProcessorTopologyProcessor: %s", descriptor.name)); 198 } 199 } 200 201 @Override 202 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 203 if (extensionPoint.equals(LOG_CONFIG_XP)) { 204 LogConfigDescriptor descriptor = (LogConfigDescriptor) contribution; 205 configs.remove(descriptor.name); 206 log.debug(String.format("Unregister logConfig: %s", descriptor.name)); 207 } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) { 208 StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor) contribution; 209 processorDescriptors.remove(descriptor.name); 210 log.debug(String.format("Unregister Stream StreamProcessorTopologyProcessor: %s", descriptor.name)); 211 } 212 } 213 214 protected class ComponentsLifeCycleListener implements ComponentManager.Listener { 215 @Override 216 public void afterStart(ComponentManager mgr, boolean isResume) { 217 // this is called once all components are started and ready 218 startComputations(); 219 } 220 221 @Override 222 public void beforeStop(ComponentManager mgr, boolean isStandby) { 223 // this is called before components are stopped 224 stopComputations(); 225 Framework.getRuntime().getComponentManager().removeListener(this); 226 } 227 } 228}