001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.stream;
020
021import java.nio.file.Path;
022import java.nio.file.Paths;
023import java.time.Duration;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Objects;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.common.Environment;
031import org.nuxeo.lib.stream.computation.Settings;
032import org.nuxeo.lib.stream.computation.StreamProcessor;
033import org.nuxeo.lib.stream.computation.Topology;
034import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
035import org.nuxeo.lib.stream.log.LogManager;
036import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
037import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
038import org.nuxeo.runtime.api.Framework;
039import org.nuxeo.runtime.codec.CodecService;
040import org.nuxeo.runtime.kafka.KafkaConfigService;
041import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl;
042import org.nuxeo.runtime.model.ComponentContext;
043import org.nuxeo.runtime.model.ComponentInstance;
044import org.nuxeo.runtime.model.ComponentManager;
045import org.nuxeo.runtime.model.DefaultComponent;
046
047/**
048 * @since 9.3
049 */
050public class StreamServiceImpl extends DefaultComponent implements StreamService {
051    public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir";
052
053    public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration";
054
055    protected static final String LOG_CONFIG_XP = "logConfig";
056
057    protected static final String STREAM_PROCESSOR_XP = "streamProcessor";
058
059    private static final Log log = LogFactory.getLog(StreamServiceImpl.class);
060
061    protected final Map<String, LogConfigDescriptor> configs = new HashMap<>();
062
063    protected final Map<String, LogManager> managers = new HashMap<>();
064
065    protected final Map<String, StreamProcessor> processors = new HashMap<>();
066
067    protected final Map<String, StreamProcessorDescriptor> processorDescriptors = new HashMap<>();
068
069    @Override
070    public int getApplicationStartedOrder() {
071        // start after kafka config service
072        return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER + 10;
073    }
074
075    @Override
076    public LogManager getLogManager(String name) {
077        // TODO: returns a wrapper that don't expose the LogManager#close
078        if (!managers.containsKey(name)) {
079            if (!configs.containsKey(name)) {
080                throw new IllegalArgumentException("Unknown logConfig: " + name);
081            }
082            LogConfigDescriptor config = configs.get(name);
083            if (config.isKafkaLog()) {
084                managers.put(name, createKafkaLogManager(config));
085            } else {
086                managers.put(name, createChronicleLogManager(config));
087            }
088        }
089        return managers.get(name);
090    }
091
092    protected LogManager createKafkaLogManager(LogConfigDescriptor config) {
093        String kafkaConfig = config.getOption("kafkaConfig", "default");
094        KafkaConfigService service = Framework.getService(KafkaConfigService.class);
095        return new KafkaLogManager(service.getTopicPrefix(kafkaConfig), service.getProducerProperties(kafkaConfig),
096                service.getConsumerProperties(kafkaConfig));
097    }
098
099    protected LogManager createChronicleLogManager(LogConfigDescriptor config) {
100        String basePath = config.getOption("basePath", null);
101        String directory = config.getOption("directory", config.getName());
102        Path path = getChroniclePath(basePath, directory);
103        String retention = getChronicleRetention(config.getOption("retention", null));
104        return new ChronicleLogManager(path, retention);
105    }
106
107    protected String getChronicleRetention(String retention) {
108        return retention != null ? retention : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d");
109    }
110
111    protected Path getChroniclePath(String basePath, String name) {
112        if (basePath != null) {
113            return Paths.get(basePath, name).toAbsolutePath();
114        }
115        basePath = Framework.getProperty(NUXEO_STREAM_DIR_PROP);
116        if (basePath != null) {
117            return Paths.get(basePath, name).toAbsolutePath();
118        }
119        basePath = Framework.getProperty(Environment.NUXEO_DATA_DIR);
120        if (basePath != null) {
121            return Paths.get(basePath, "stream", name).toAbsolutePath();
122        }
123        return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream", name).toAbsolutePath();
124    }
125
126    protected void createStreamIfNotExists(String name, LogConfigDescriptor config) {
127        if (config.getLogsToCreate().isEmpty()) {
128            return;
129        }
130        LogManager manager = getLogManager(name);
131        config.getLogsToCreate().forEach((stream, size) -> {
132            log.info("Create if not exists stream: " + stream + " with manager: " + name);
133            manager.createIfNotExists(stream, size);
134        });
135    }
136
137    @Override
138    public void start(ComponentContext context) {
139        super.start(context);
140        configs.forEach(this::createStreamIfNotExists);
141        processorDescriptors.forEach(this::initProcessor);
142        new ComponentsLifeCycleListener().install();
143    }
144
145    protected void initProcessor(String name, StreamProcessorDescriptor descriptor) {
146        if (processors.containsKey(name)) {
147            log.error("Processor already initialized: " + name);
148            return;
149        }
150        log.info("Init Stream processor: " + name + " with manager: " + descriptor.config);
151        LogManager manager = getLogManager(descriptor.config);
152        Topology topology = descriptor.getTopology();
153        StreamProcessor streamProcessor = new LogStreamProcessor(manager);
154        Settings settings = descriptor.getSettings(Framework.getService(CodecService.class));
155        if (log.isDebugEnabled()) {
156            log.debug("Starting computation topology: " + name + "\n" + topology.toPlantuml(settings));
157        }
158        streamProcessor.init(topology, settings);
159        processors.put(name, streamProcessor);
160    }
161
162    @Override
163    public void stop(ComponentContext context) throws InterruptedException {
164        super.stop(context);
165        stopComputations(); // should have already be done by the beforeStop listener
166        closeLogManagers();
167    }
168
169    protected void startComputations() {
170        processorDescriptors.keySet().forEach(name -> {
171            StreamProcessor manager = processors.get(name);
172            if (manager != null) {
173                manager.start();
174            }
175        });
176    }
177
178    protected void stopComputations() {
179        processors.forEach((name, manager) -> manager.stop(Duration.ofSeconds(1)));
180        processors.clear();
181    }
182
183    protected void closeLogManagers() {
184        managers.values().stream().filter(Objects::nonNull).forEach(LogManager::close);
185        managers.clear();
186    }
187
188    @Override
189    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
190        if (extensionPoint.equals(LOG_CONFIG_XP)) {
191            LogConfigDescriptor descriptor = (LogConfigDescriptor) contribution;
192            configs.put(descriptor.name, descriptor);
193            log.debug(String.format("Register logConfig: %s", descriptor.name));
194        } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) {
195            StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor) contribution;
196            processorDescriptors.put(descriptor.name, descriptor);
197            log.debug(String.format("Register Stream StreamProcessorTopologyProcessor: %s", descriptor.name));
198        }
199    }
200
201    @Override
202    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
203        if (extensionPoint.equals(LOG_CONFIG_XP)) {
204            LogConfigDescriptor descriptor = (LogConfigDescriptor) contribution;
205            configs.remove(descriptor.name);
206            log.debug(String.format("Unregister logConfig: %s", descriptor.name));
207        } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) {
208            StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor) contribution;
209            processorDescriptors.remove(descriptor.name);
210            log.debug(String.format("Unregister Stream StreamProcessorTopologyProcessor: %s", descriptor.name));
211        }
212    }
213
214    protected class ComponentsLifeCycleListener implements ComponentManager.Listener {
215        @Override
216        public void afterStart(ComponentManager mgr, boolean isResume) {
217            // this is called once all components are started and ready
218            startComputations();
219        }
220
221        @Override
222        public void beforeStop(ComponentManager mgr, boolean isStandby) {
223            // this is called before components are stopped
224            stopComputations();
225            Framework.getRuntime().getComponentManager().removeListener(this);
226        }
227    }
228}