001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.runtime.stream; 020 021import java.nio.file.Path; 022import java.nio.file.Paths; 023import java.time.Duration; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Objects; 028 029import org.apache.logging.log4j.Logger; 030import org.nuxeo.common.Environment; 031import org.nuxeo.lib.stream.StreamRuntimeException; 032import org.nuxeo.lib.stream.codec.Codec; 033import org.nuxeo.lib.stream.computation.Record; 034import org.nuxeo.lib.stream.computation.Settings; 035import org.nuxeo.lib.stream.computation.StreamProcessor; 036import org.nuxeo.lib.stream.computation.Topology; 037import org.nuxeo.lib.stream.computation.log.LogStreamProcessor; 038import org.nuxeo.lib.stream.log.LogManager; 039import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager; 040import org.nuxeo.lib.stream.log.kafka.KafkaLogManager; 041import org.nuxeo.runtime.api.Framework; 042import org.nuxeo.runtime.codec.CodecService; 043import org.nuxeo.runtime.kafka.KafkaConfigService; 044import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl; 045import org.nuxeo.runtime.model.ComponentContext; 046import org.nuxeo.runtime.model.ComponentManager; 047import org.nuxeo.runtime.model.DefaultComponent; 048 049/** 050 * @since 9.3 051 */ 052public class StreamServiceImpl extends DefaultComponent implements StreamService { 053 054 private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(StreamServiceImpl.class); 055 056 public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir"; 057 058 public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration"; 059 060 public static final String DEFAULT_CODEC = "avro"; 061 062 protected static final String XP_LOG_CONFIG = "logConfig"; 063 064 protected static final String XP_STREAM_PROCESSOR = "streamProcessor"; 065 066 protected final Map<String, LogManager> managers = new HashMap<>(); 067 068 protected final Map<String, StreamProcessor> processors = new HashMap<>(); 069 070 @Override 071 public int getApplicationStartedOrder() { 072 // start after kafka config service 073 return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER + 10; 074 } 075 076 @Override 077 public LogManager getLogManager(String name) { 078 // TODO: returns a wrapper that don't expose the LogManager#close 079 if (!managers.containsKey(name)) { 080 LogConfigDescriptor config = getDescriptor(XP_LOG_CONFIG, name); 081 if (config == null) { 082 throw new IllegalArgumentException("Unknown logConfig: " + name); 083 } 084 if ("kafka".equalsIgnoreCase(config.type)) { 085 managers.put(name, createKafkaLogManager(config)); 086 } else { 087 managers.put(name, createChronicleLogManager(config)); 088 } 089 } 090 return managers.get(name); 091 } 092 093 protected LogManager createKafkaLogManager(LogConfigDescriptor config) { 094 String kafkaConfig = config.options.getOrDefault("kafkaConfig", "default"); 095 KafkaConfigService service = Framework.getService(KafkaConfigService.class); 096 return new KafkaLogManager(service.getTopicPrefix(kafkaConfig), service.getProducerProperties(kafkaConfig), 097 service.getConsumerProperties(kafkaConfig)); 098 } 099 100 protected LogManager createChronicleLogManager(LogConfigDescriptor config) { 101 String basePath = config.options.getOrDefault("basePath", null); 102 String directory = config.options.getOrDefault("directory", config.getId()); 103 Path path = getChroniclePath(basePath, directory); 104 String retention = getChronicleRetention(config.options.getOrDefault("retention", null)); 105 return new ChronicleLogManager(path, retention); 106 } 107 108 protected String getChronicleRetention(String retention) { 109 return retention != null ? retention : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d"); 110 } 111 112 protected Path getChroniclePath(String basePath, String name) { 113 if (basePath != null) { 114 return Paths.get(basePath, name).toAbsolutePath(); 115 } 116 basePath = Framework.getProperty(NUXEO_STREAM_DIR_PROP); 117 if (basePath != null) { 118 return Paths.get(basePath, name).toAbsolutePath(); 119 } 120 basePath = Framework.getProperty(Environment.NUXEO_DATA_DIR); 121 if (basePath != null) { 122 return Paths.get(basePath, "stream", name).toAbsolutePath(); 123 } 124 return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream", name).toAbsolutePath(); 125 } 126 127 protected void createStreamIfNotExists(LogConfigDescriptor config) { 128 if (config.logs.isEmpty()) { 129 return; 130 } 131 LogManager manager = getLogManager(config.getId()); 132 config.logs.forEach(l -> { 133 log.info("Create if not exists stream: {} with manager: {}", l.getId(), config.getId()); 134 manager.createIfNotExists(l.getId(), l.size); 135 }); 136 } 137 138 @Override 139 public void start(ComponentContext context) { 140 super.start(context); 141 List<LogConfigDescriptor> logDescs = getDescriptors(XP_LOG_CONFIG); 142 logDescs.forEach(this::createStreamIfNotExists); 143 List<StreamProcessorDescriptor> streamDescs = getDescriptors(XP_STREAM_PROCESSOR); 144 streamDescs.forEach(this::initProcessor); 145 new ComponentsLifeCycleListener().install(); 146 } 147 148 protected void initProcessor(StreamProcessorDescriptor descriptor) { 149 if (processors.containsKey(descriptor.getId())) { 150 log.error("Processor already initialized: {}", descriptor.getId()); 151 return; 152 } 153 log.info("Init Stream processor: {} with manager: {}", descriptor.getId(), descriptor.config); 154 LogManager manager = getLogManager(descriptor.config); 155 Topology topology; 156 try { 157 topology = descriptor.klass.getDeclaredConstructor().newInstance().getTopology(descriptor.options); 158 } catch (ReflectiveOperationException e) { 159 throw new StreamRuntimeException("Can not create topology for processor: " + descriptor.getId(), e); 160 } 161 StreamProcessor streamProcessor = new LogStreamProcessor(manager); 162 Settings settings = getSettings(descriptor); 163 log.debug("Starting computation topology: {}\n{}", descriptor::getId, () -> topology.toPlantuml(settings)); 164 streamProcessor.init(topology, settings); 165 processors.put(descriptor.getId(), streamProcessor); 166 } 167 168 protected Settings getSettings(StreamProcessorDescriptor descriptor) { 169 CodecService codecService = Framework.getService(CodecService.class); 170 Codec<Record> actualCodec = descriptor.defaultCodec == null ? codecService.getCodec(DEFAULT_CODEC, Record.class) 171 : codecService.getCodec(descriptor.defaultCodec, Record.class); 172 Settings settings = new Settings(descriptor.defaultConcurrency, descriptor.defaultPartitions, actualCodec, 173 descriptor.getDefaultPolicy()); 174 descriptor.computations.forEach(comp -> settings.setConcurrency(comp.name, comp.concurrency)); 175 descriptor.policies.forEach(policy -> settings.setPolicy(policy.name, descriptor.getPolicy(policy.name))); 176 descriptor.streams.forEach(stream -> settings.setPartitions(stream.name, stream.partitions)); 177 descriptor.streams.stream() 178 .filter(stream -> Objects.nonNull(stream.codec)) 179 .forEach(stream -> settings.setCodec(stream.name, 180 codecService.getCodec(stream.codec, Record.class))); 181 return settings; 182 } 183 184 @Override 185 public void stop(ComponentContext context) throws InterruptedException { 186 super.stop(context); 187 stopComputations(); // should have already be done by the beforeStop listener 188 closeLogManagers(); 189 } 190 191 protected void startComputations() { 192 getDescriptors(XP_STREAM_PROCESSOR).forEach(d -> { 193 StreamProcessor manager = processors.get(d.getId()); 194 if (manager != null) { 195 manager.start(); 196 } 197 }); 198 } 199 200 protected void stopComputations() { 201 processors.forEach((name, manager) -> manager.stop(Duration.ofSeconds(1))); 202 processors.clear(); 203 } 204 205 protected void closeLogManagers() { 206 managers.values().stream().filter(Objects::nonNull).forEach(LogManager::close); 207 managers.clear(); 208 } 209 210 protected class ComponentsLifeCycleListener implements ComponentManager.Listener { 211 @Override 212 public void afterStart(ComponentManager mgr, boolean isResume) { 213 // this is called once all components are started and ready 214 startComputations(); 215 } 216 217 @Override 218 public void beforeStop(ComponentManager mgr, boolean isStandby) { 219 // this is called before components are stopped 220 stopComputations(); 221 Framework.getRuntime().getComponentManager().removeListener(this); 222 } 223 } 224}