001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.stream;
020
021import java.nio.file.Path;
022import java.nio.file.Paths;
023import java.time.Duration;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Objects;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.common.Environment;
031import org.nuxeo.lib.stream.computation.Settings;
032import org.nuxeo.lib.stream.computation.StreamProcessor;
033import org.nuxeo.lib.stream.computation.Topology;
034import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
035import org.nuxeo.lib.stream.log.LogManager;
036import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
037import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
038import org.nuxeo.runtime.api.Framework;
039import org.nuxeo.runtime.kafka.KafkaConfigService;
040import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl;
041import org.nuxeo.runtime.model.ComponentContext;
042import org.nuxeo.runtime.model.ComponentInstance;
043import org.nuxeo.runtime.model.ComponentManager;
044import org.nuxeo.runtime.model.DefaultComponent;
045
046/**
047 * @since 9.3
048 */
049public class StreamServiceImpl extends DefaultComponent implements StreamService {
050    public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir";
051
052    public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration";
053
054    protected static final String LOG_CONFIG_XP = "logConfig";
055
056    protected static final String STREAM_PROCESSOR_XP = "streamProcessor";
057
058    private static final Log log = LogFactory.getLog(StreamServiceImpl.class);
059
060    protected final Map<String, LogConfigDescriptor> configs = new HashMap<>();
061
062    protected final Map<String, LogManager> managers = new HashMap<>();
063
064    protected final Map<String, StreamProcessor> processors = new HashMap<>();
065
066    protected final Map<String, StreamProcessorDescriptor> processorDescriptors = new HashMap<>();
067
068    @Override
069    public int getApplicationStartedOrder() {
070        // start after kafka config service
071        return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER + 10;
072    }
073
074    @Override
075    public LogManager getLogManager(String name) {
076        // TODO: return a wrapper that don't expose the LogManager#close
077        if (!managers.containsKey(name)) {
078            if (!configs.containsKey(name)) {
079                throw new IllegalArgumentException("Unknown logConfig: " + name);
080            }
081            LogConfigDescriptor config = configs.get(name);
082            if (config.isKafkaLog()) {
083                managers.put(name, createKafkaLogManager(config));
084            } else {
085                managers.put(name, createChronicleLogManager(config));
086            }
087        }
088        return managers.get(name);
089    }
090
091    protected LogManager createKafkaLogManager(LogConfigDescriptor config) {
092        String kafkaConfig = config.getOption("kafkaConfig", "default");
093        KafkaConfigService service = Framework.getService(KafkaConfigService.class);
094        return new KafkaLogManager(service.getZkServers(kafkaConfig), service.getTopicPrefix(kafkaConfig),
095                service.getProducerProperties(kafkaConfig), service.getConsumerProperties(kafkaConfig));
096    }
097
098    protected LogManager createChronicleLogManager(LogConfigDescriptor config) {
099        String basePath = config.getOption("basePath", null);
100        String directory = config.getOption("directory", config.getName());
101        Path path = getChroniclePath(basePath, directory);
102        String retention = getChronicleRetention(config.getOption("retention", null));
103        return new ChronicleLogManager(path, retention);
104    }
105
106    protected String getChronicleRetention(String retention) {
107        return retention != null ? retention : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d");
108    }
109
110    protected Path getChroniclePath(String basePath, String name) {
111        if (basePath != null) {
112            return Paths.get(basePath, name).toAbsolutePath();
113        }
114        basePath = Framework.getProperty(NUXEO_STREAM_DIR_PROP);
115        if (basePath != null) {
116            return Paths.get(basePath, name).toAbsolutePath();
117        }
118        basePath = Framework.getProperty(Environment.NUXEO_DATA_DIR);
119        if (basePath != null) {
120            return Paths.get(basePath, "stream", name).toAbsolutePath();
121        }
122        return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream", name).toAbsolutePath();
123    }
124
125    protected void createStreamIfNotExists(String name, LogConfigDescriptor config) {
126        if (config.getLogsToCreate().isEmpty()) {
127            return;
128        }
129        LogManager manager = getLogManager(name);
130        config.getLogsToCreate().forEach((stream, size) -> {
131            log.info("Create if not exists stream: " + stream + " with manager: " + name);
132            manager.createIfNotExists(stream, size);
133        });
134    }
135
136    @Override
137    public void start(ComponentContext context) {
138        super.start(context);
139        configs.forEach(this::createStreamIfNotExists);
140        processorDescriptors.forEach(this::initProcessor);
141        new ComponentsLifeCycleListener().install();
142    }
143
144    protected void initProcessor(String name, StreamProcessorDescriptor descriptor) {
145        if (processors.containsKey(name)) {
146            log.error("Processor already initialized: " + name);
147            return;
148        }
149        log.info("Init Stream processor: " + name + " with manager: " + descriptor.config);
150        LogManager manager = getLogManager(descriptor.config);
151        Topology topology = descriptor.getTopology();
152        StreamProcessor streamProcessor = new LogStreamProcessor(manager);
153        Settings settings = descriptor.getSettings();
154        if (log.isDebugEnabled()) {
155            log.debug("Starting computation topology: " + name + "\n" + topology.toPlantuml(settings));
156        }
157        streamProcessor.init(topology, settings);
158        processors.put(name, streamProcessor);
159    }
160
161    @Override
162    public void stop(ComponentContext context) throws InterruptedException {
163        super.stop(context);
164        stopComputations(); // should have already be done by the beforeStop listener
165        closeLogManagers();
166    }
167
168    protected void startComputations() {
169        processorDescriptors.keySet().forEach(name -> {
170            StreamProcessor manager = processors.get(name);
171            if (manager != null) {
172                manager.start();
173            }
174        });
175    }
176
177    protected void stopComputations() {
178        processors.forEach((name, manager) -> manager.stop(Duration.ofSeconds(1)));
179        processors.clear();
180    }
181
182    protected void closeLogManagers() {
183        managers.values().stream().filter(Objects::nonNull).forEach(LogManager::close);
184        managers.clear();
185    }
186
187    @Override
188    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
189        if (extensionPoint.equals(LOG_CONFIG_XP)) {
190            LogConfigDescriptor descriptor = (LogConfigDescriptor) contribution;
191            configs.put(descriptor.name, descriptor);
192            log.debug(String.format("Register logConfig: %s", descriptor.name));
193        } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) {
194            StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor) contribution;
195            processorDescriptors.put(descriptor.name, descriptor);
196            log.debug(String.format("Register Stream StreamProcessorTopologyProcessor: %s", descriptor.name));
197        }
198    }
199
200    @Override
201    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
202        if (extensionPoint.equals(LOG_CONFIG_XP)) {
203            LogConfigDescriptor descriptor = (LogConfigDescriptor) contribution;
204            configs.remove(descriptor.name);
205            log.debug(String.format("Unregister logConfig: %s", descriptor.name));
206        } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) {
207            StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor) contribution;
208            processorDescriptors.remove(descriptor.name);
209            log.debug(String.format("Unregister Stream StreamProcessorTopologyProcessor: %s", descriptor.name));
210        }
211    }
212
213    protected class ComponentsLifeCycleListener implements ComponentManager.Listener {
214        @Override
215        public void afterStart(ComponentManager mgr, boolean isResume) {
216            // this is called once all components are started and ready
217            startComputations();
218        }
219
220        @Override
221        public void beforeStop(ComponentManager mgr, boolean isStandby) {
222            // this is called before components are stopped
223            stopComputations();
224            Framework.getRuntime().getComponentManager().removeListener(this);
225        }
226    }
227}