001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.runtime.stream; 020 021import java.nio.file.Path; 022import java.nio.file.Paths; 023import java.time.Duration; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Objects; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.common.Environment; 031import org.nuxeo.lib.stream.computation.Settings; 032import org.nuxeo.lib.stream.computation.StreamProcessor; 033import org.nuxeo.lib.stream.computation.Topology; 034import org.nuxeo.lib.stream.computation.log.LogStreamProcessor; 035import org.nuxeo.lib.stream.log.LogManager; 036import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager; 037import org.nuxeo.lib.stream.log.kafka.KafkaLogManager; 038import org.nuxeo.runtime.api.Framework; 039import org.nuxeo.runtime.kafka.KafkaConfigService; 040import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl; 041import org.nuxeo.runtime.model.ComponentContext; 042import org.nuxeo.runtime.model.ComponentInstance; 043import org.nuxeo.runtime.model.ComponentManager; 044import org.nuxeo.runtime.model.DefaultComponent; 045 046/** 047 * @since 9.3 048 */ 049public class StreamServiceImpl extends DefaultComponent implements StreamService { 050 public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir"; 051 052 public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration"; 053 054 protected static final String LOG_CONFIG_XP = "logConfig"; 055 056 protected static final String STREAM_PROCESSOR_XP = "streamProcessor"; 057 058 private static final Log log = LogFactory.getLog(StreamServiceImpl.class); 059 060 protected final Map<String, LogConfigDescriptor> configs = new HashMap<>(); 061 062 protected final Map<String, LogManager> managers = new HashMap<>(); 063 064 protected final Map<String, StreamProcessor> processors = new HashMap<>(); 065 066 protected final Map<String, StreamProcessorDescriptor> processorDescriptors = new HashMap<>(); 067 068 @Override 069 public int getApplicationStartedOrder() { 070 // start after kafka config service 071 return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER + 10; 072 } 073 074 @Override 075 public LogManager getLogManager(String name) { 076 // TODO: return a wrapper that don't expose the LogManager#close 077 if (!managers.containsKey(name)) { 078 if (!configs.containsKey(name)) { 079 throw new IllegalArgumentException("Unknown logConfig: " + name); 080 } 081 LogConfigDescriptor config = configs.get(name); 082 if (config.isKafkaLog()) { 083 managers.put(name, createKafkaLogManager(config)); 084 } else { 085 managers.put(name, createChronicleLogManager(config)); 086 } 087 } 088 return managers.get(name); 089 } 090 091 protected LogManager createKafkaLogManager(LogConfigDescriptor config) { 092 String kafkaConfig = config.getOption("kafkaConfig", "default"); 093 KafkaConfigService service = Framework.getService(KafkaConfigService.class); 094 return new KafkaLogManager(service.getZkServers(kafkaConfig), service.getTopicPrefix(kafkaConfig), 095 service.getProducerProperties(kafkaConfig), service.getConsumerProperties(kafkaConfig)); 096 } 097 098 protected LogManager createChronicleLogManager(LogConfigDescriptor config) { 099 String basePath = config.getOption("basePath", null); 100 String directory = config.getOption("directory", config.getName()); 101 Path path = getChroniclePath(basePath, directory); 102 String retention = getChronicleRetention(config.getOption("retention", null)); 103 return new ChronicleLogManager(path, retention); 104 } 105 106 protected String getChronicleRetention(String retention) { 107 return retention != null ? retention : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d"); 108 } 109 110 protected Path getChroniclePath(String basePath, String name) { 111 if (basePath != null) { 112 return Paths.get(basePath, name).toAbsolutePath(); 113 } 114 basePath = Framework.getProperty(NUXEO_STREAM_DIR_PROP); 115 if (basePath != null) { 116 return Paths.get(basePath, name).toAbsolutePath(); 117 } 118 basePath = Framework.getProperty(Environment.NUXEO_DATA_DIR); 119 if (basePath != null) { 120 return Paths.get(basePath, "stream", name).toAbsolutePath(); 121 } 122 return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream", name).toAbsolutePath(); 123 } 124 125 protected void createStreamIfNotExists(String name, LogConfigDescriptor config) { 126 if (config.getLogsToCreate().isEmpty()) { 127 return; 128 } 129 LogManager manager = getLogManager(name); 130 config.getLogsToCreate().forEach((stream, size) -> { 131 log.info("Create if not exists stream: " + stream + " with manager: " + name); 132 manager.createIfNotExists(stream, size); 133 }); 134 } 135 136 @Override 137 public void start(ComponentContext context) { 138 super.start(context); 139 configs.forEach(this::createStreamIfNotExists); 140 processorDescriptors.forEach(this::initProcessor); 141 new ComponentsLifeCycleListener().install(); 142 } 143 144 protected void initProcessor(String name, StreamProcessorDescriptor descriptor) { 145 if (processors.containsKey(name)) { 146 log.error("Processor already initialized: " + name); 147 return; 148 } 149 log.info("Init Stream processor: " + name + " with manager: " + descriptor.config); 150 LogManager manager = getLogManager(descriptor.config); 151 Topology topology = descriptor.getTopology(); 152 StreamProcessor streamProcessor = new LogStreamProcessor(manager); 153 Settings settings = descriptor.getSettings(); 154 if (log.isDebugEnabled()) { 155 log.debug("Starting computation topology: " + name + "\n" + topology.toPlantuml(settings)); 156 } 157 streamProcessor.init(topology, settings); 158 processors.put(name, streamProcessor); 159 } 160 161 @Override 162 public void stop(ComponentContext context) throws InterruptedException { 163 super.stop(context); 164 stopComputations(); // should have already be done by the beforeStop listener 165 closeLogManagers(); 166 } 167 168 protected void startComputations() { 169 processorDescriptors.keySet().forEach(name -> { 170 StreamProcessor manager = processors.get(name); 171 if (manager != null) { 172 manager.start(); 173 } 174 }); 175 } 176 177 protected void stopComputations() { 178 processors.forEach((name, manager) -> manager.stop(Duration.ofSeconds(1))); 179 processors.clear(); 180 } 181 182 protected void closeLogManagers() { 183 managers.values().stream().filter(Objects::nonNull).forEach(LogManager::close); 184 managers.clear(); 185 } 186 187 @Override 188 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 189 if (extensionPoint.equals(LOG_CONFIG_XP)) { 190 LogConfigDescriptor descriptor = (LogConfigDescriptor) contribution; 191 configs.put(descriptor.name, descriptor); 192 log.debug(String.format("Register logConfig: %s", descriptor.name)); 193 } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) { 194 StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor) contribution; 195 processorDescriptors.put(descriptor.name, descriptor); 196 log.debug(String.format("Register Stream StreamProcessorTopologyProcessor: %s", descriptor.name)); 197 } 198 } 199 200 @Override 201 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 202 if (extensionPoint.equals(LOG_CONFIG_XP)) { 203 LogConfigDescriptor descriptor = (LogConfigDescriptor) contribution; 204 configs.remove(descriptor.name); 205 log.debug(String.format("Unregister logConfig: %s", descriptor.name)); 206 } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) { 207 StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor) contribution; 208 processorDescriptors.remove(descriptor.name); 209 log.debug(String.format("Unregister Stream StreamProcessorTopologyProcessor: %s", descriptor.name)); 210 } 211 } 212 213 protected class ComponentsLifeCycleListener implements ComponentManager.Listener { 214 @Override 215 public void afterStart(ComponentManager mgr, boolean isResume) { 216 // this is called once all components are started and ready 217 startComputations(); 218 } 219 220 @Override 221 public void beforeStop(ComponentManager mgr, boolean isStandby) { 222 // this is called before components are stopped 223 stopComputations(); 224 Framework.getRuntime().getComponentManager().removeListener(this); 225 } 226 } 227}