001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.runtime.stream; 020 021import java.nio.file.Path; 022import java.nio.file.Paths; 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028 029import org.apache.logging.log4j.Logger; 030import org.nuxeo.common.Environment; 031import org.nuxeo.lib.stream.StreamRuntimeException; 032import org.nuxeo.lib.stream.codec.Codec; 033import org.nuxeo.lib.stream.computation.Record; 034import org.nuxeo.lib.stream.computation.Settings; 035import org.nuxeo.lib.stream.computation.StreamManager; 036import org.nuxeo.lib.stream.computation.StreamProcessor; 037import org.nuxeo.lib.stream.computation.Topology; 038import org.nuxeo.lib.stream.computation.log.LogStreamManager; 039import org.nuxeo.lib.stream.log.LogConfig; 040import org.nuxeo.lib.stream.log.LogManager; 041import org.nuxeo.lib.stream.log.Name; 042import org.nuxeo.lib.stream.log.UnifiedLogManager; 043import org.nuxeo.lib.stream.log.chronicle.ChronicleLogConfig; 044import org.nuxeo.lib.stream.log.kafka.KafkaLogConfig; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.codec.CodecService; 047import org.nuxeo.runtime.kafka.KafkaConfigService; 048import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl; 049import org.nuxeo.runtime.model.ComponentContext; 050import org.nuxeo.runtime.model.ComponentManager; 051import org.nuxeo.runtime.model.DefaultComponent; 052 053/** 054 * @since 9.3 055 */ 056public class StreamServiceImpl extends DefaultComponent implements StreamService { 057 058 private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(StreamServiceImpl.class); 059 060 public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir"; 061 062 public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration"; 063 064 public static final String DEFAULT_CODEC = "avro"; 065 066 protected static final String XP_LOG_CONFIG = "logConfig"; 067 068 protected static final String XP_STREAM_PROCESSOR = "streamProcessor"; 069 070 protected LogManager logManager; 071 072 protected StreamManager streamManager; 073 074 protected final Map<String, StreamProcessor> processors = new HashMap<>(); 075 076 /** 077 * @since 11.2 078 */ 079 public static final String STREAM_PROCESSING_ENABLED = "nuxeo.stream.processing.enabled"; 080 081 protected Boolean isStreamProcessingDisabled; 082 083 @Override 084 public int getApplicationStartedOrder() { 085 // start after kafka config service 086 return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER + 10; 087 } 088 089 @Override 090 public LogManager getLogManager() { 091 // TODO: returns a wrapper that don't expose the LogManager#close 092 return logManager; 093 } 094 095 @Override 096 public StreamManager getStreamManager() { 097 return streamManager; 098 } 099 100 protected String getChronicleRetention(String retention) { 101 return retention != null ? retention : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d"); 102 } 103 104 protected Path getChroniclePath(String basePath) { 105 if (basePath != null) { 106 return Paths.get(basePath).toAbsolutePath(); 107 } 108 basePath = Framework.getProperty(NUXEO_STREAM_DIR_PROP); 109 if (basePath != null) { 110 return Paths.get(basePath).toAbsolutePath(); 111 } 112 basePath = Framework.getProperty(Environment.NUXEO_DATA_DIR); 113 if (basePath != null) { 114 return Paths.get(basePath, "stream").toAbsolutePath(); 115 } 116 return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream").toAbsolutePath(); 117 } 118 119 protected void createLogIfNotExists(LogConfigDescriptor config) { 120 if (!config.isEnabled() || config.logs.isEmpty()) { 121 return; 122 } 123 config.logs.forEach(l -> { 124 log.info("Create if not exists stream: {} with manager: {}", l.getId(), config.getId()); 125 logManager.createIfNotExists(Name.ofUrn(l.getId()), l.size); 126 }); 127 } 128 129 @Override 130 public void start(ComponentContext context) { 131 super.start(context); 132 List<LogConfig> configs = getLogConfigs(); 133 logManager = new UnifiedLogManager(configs); 134 streamManager = new LogStreamManager(logManager); 135 List<LogConfigDescriptor> logDescs = getDescriptors(XP_LOG_CONFIG); 136 logDescs.forEach(this::createLogIfNotExists); 137 List<StreamProcessorDescriptor> streamDescs = getDescriptors(XP_STREAM_PROCESSOR); 138 streamDescs.forEach(this::initProcessor); 139 new ComponentsLifeCycleListener().install(); 140 } 141 142 protected List<LogConfig> getLogConfigs() { 143 List<LogConfigDescriptor> logDescs = getDescriptors(XP_LOG_CONFIG); 144 List<LogConfig> ret = new ArrayList<>(logDescs.size()); 145 for (LogConfigDescriptor desc : logDescs) { 146 if (!desc.isEnabled() || desc.onlyLogDeclaration()) { 147 continue; 148 } 149 if ("kafka".equalsIgnoreCase(desc.type)) { 150 ret.add(createKafkaLogConfig(desc)); 151 } else { 152 ret.add(createChronicleLogConfig(desc)); 153 } 154 } 155 return ret; 156 } 157 158 protected LogConfig createKafkaLogConfig(LogConfigDescriptor desc) { 159 String kafkaConfig = desc.options.getOrDefault("kafkaConfig", "default"); 160 KafkaConfigService service = Framework.getService(KafkaConfigService.class); 161 return new KafkaLogConfig(desc.getId(), desc.isDefault(), desc.getPatterns(), 162 service.getTopicPrefix(kafkaConfig), 163 service.getAdminProperties(kafkaConfig), service.getProducerProperties(kafkaConfig), 164 service.getConsumerProperties(kafkaConfig)); 165 } 166 167 protected LogConfig createChronicleLogConfig(LogConfigDescriptor desc) { 168 String basePath = desc.options.getOrDefault("basePath", null); 169 Path path = getChroniclePath(basePath); 170 String retention = getChronicleRetention(desc.options.getOrDefault("retention", null)); 171 return new ChronicleLogConfig(desc.getId(), desc.isDefault(), desc.getPatterns(), path, retention); 172 } 173 174 protected void initProcessor(StreamProcessorDescriptor descriptor) { 175 if (! descriptor.isEnabled()) { 176 log.info("Processor {} disabled", descriptor.getId()); 177 return; 178 } 179 if (processors.containsKey(descriptor.getId())) { 180 log.error("Processor already initialized: {}", descriptor.getId()); 181 return; 182 } 183 log.info("Init Stream processor: {}", descriptor.getId()); 184 Topology topology; 185 try { 186 topology = descriptor.klass.getDeclaredConstructor().newInstance().getTopology(descriptor.options); 187 } catch (ReflectiveOperationException e) { 188 throw new StreamRuntimeException("Can not create topology for processor: " + descriptor.getId(), e); 189 } 190 Settings settings = getSettings(descriptor); 191 log.debug("Starting computation topology: {}\n{}", descriptor::getId, () -> topology.toPlantuml(settings)); 192 if (!isProcessingDisabled() && descriptor.isStart()) { 193 StreamProcessor streamProcessor = streamManager.registerAndCreateProcessor(descriptor.getId(), topology, 194 settings); 195 processors.put(descriptor.getId(), streamProcessor); 196 } else { 197 streamManager.register(descriptor.getId(), topology, settings); 198 processors.put(descriptor.getId(), null); 199 } 200 } 201 202 protected Settings getSettings(StreamProcessorDescriptor descriptor) { 203 CodecService codecService = Framework.getService(CodecService.class); 204 Codec<Record> actualCodec = descriptor.defaultCodec == null ? codecService.getCodec(DEFAULT_CODEC, Record.class) 205 : codecService.getCodec(descriptor.defaultCodec, Record.class); 206 Settings settings = new Settings(descriptor.defaultConcurrency, descriptor.defaultPartitions, actualCodec, 207 descriptor.getDefaultPolicy(), null, descriptor.defaultExternal); 208 descriptor.computations.forEach(comp -> settings.setConcurrency(comp.name, comp.concurrency)); 209 descriptor.policies.forEach(policy -> settings.setPolicy(policy.name, descriptor.getPolicy(policy.name))); 210 for (StreamProcessorDescriptor.StreamDescriptor streamDescriptor : descriptor.streams) { 211 settings.setPartitions(streamDescriptor.name, 212 streamDescriptor.partitions != null ? streamDescriptor.partitions : descriptor.defaultPartitions); 213 if (streamDescriptor.codec != null) { 214 settings.setCodec(streamDescriptor.name, codecService.getCodec(streamDescriptor.codec, Record.class)); 215 } 216 streamDescriptor.filters.forEach(filter -> settings.addFilter(streamDescriptor.name, filter.getFilter())); 217 settings.setExternal(Name.ofUrn(streamDescriptor.name), 218 streamDescriptor.external != null ? streamDescriptor.external : descriptor.defaultExternal); 219 } 220 return settings; 221 } 222 223 @Override 224 public void stop(ComponentContext context) throws InterruptedException { 225 super.stop(context); 226 logManager.close(); 227 } 228 229 protected void startProcessors() { 230 log.debug("Start processors"); 231 getDescriptors(XP_STREAM_PROCESSOR).forEach(d -> { 232 StreamProcessor processor = processors.get(d.getId()); 233 if (processor != null) { 234 processor.start(); 235 } 236 }); 237 } 238 239 @Override 240 public void stopProcessors() { 241 log.debug("Stop processors"); 242 processors.forEach((name, processor) -> { 243 if (processor != null) { 244 processor.stop(Duration.ofSeconds(1)); 245 } 246 }); 247 processors.clear(); 248 } 249 250 protected class ComponentsLifeCycleListener implements ComponentManager.Listener { 251 @Override 252 public void afterStart(ComponentManager mgr, boolean isResume) { 253 // this is called once all components are started and ready 254 log.debug("afterStart"); 255 startProcessors(); 256 } 257 258 @Override 259 public void beforeStop(ComponentManager mgr, boolean isStandby) { 260 // this is called before components are stopped 261 log.debug("beforeStop"); 262 stopProcessors(); 263 Framework.getRuntime().getComponentManager().removeListener(this); 264 } 265 } 266 267 protected boolean isProcessingDisabled() { 268 if (isStreamProcessingDisabled == null) { 269 if (Framework.isBooleanPropertyFalse(STREAM_PROCESSING_ENABLED)) { 270 log.warn("Stream Processing has been disabled on this node"); 271 isStreamProcessingDisabled = true; 272 } else { 273 isStreamProcessingDisabled = false; 274 } 275 } 276 return isStreamProcessingDisabled; 277 } 278}