001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.stream;
020
021import java.nio.file.Path;
022import java.nio.file.Paths;
023import java.time.Duration;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Objects;
028
029import org.apache.logging.log4j.Logger;
030import org.nuxeo.common.Environment;
031import org.nuxeo.lib.stream.StreamRuntimeException;
032import org.nuxeo.lib.stream.codec.Codec;
033import org.nuxeo.lib.stream.computation.Record;
034import org.nuxeo.lib.stream.computation.Settings;
035import org.nuxeo.lib.stream.computation.StreamProcessor;
036import org.nuxeo.lib.stream.computation.Topology;
037import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
038import org.nuxeo.lib.stream.log.LogManager;
039import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
040import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
041import org.nuxeo.runtime.api.Framework;
042import org.nuxeo.runtime.codec.CodecService;
043import org.nuxeo.runtime.kafka.KafkaConfigService;
044import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl;
045import org.nuxeo.runtime.model.ComponentContext;
046import org.nuxeo.runtime.model.ComponentManager;
047import org.nuxeo.runtime.model.DefaultComponent;
048
049/**
050 * @since 9.3
051 */
052public class StreamServiceImpl extends DefaultComponent implements StreamService {
053
054    private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(StreamServiceImpl.class);
055
056    public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir";
057
058    public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration";
059
060    public static final String DEFAULT_CODEC = "avro";
061
062    protected static final String XP_LOG_CONFIG = "logConfig";
063
064    protected static final String XP_STREAM_PROCESSOR = "streamProcessor";
065
066    protected final Map<String, LogManager> managers = new HashMap<>();
067
068    protected final Map<String, StreamProcessor> processors = new HashMap<>();
069
070    @Override
071    public int getApplicationStartedOrder() {
072        // start after kafka config service
073        return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER + 10;
074    }
075
076    @Override
077    public LogManager getLogManager(String name) {
078        // TODO: returns a wrapper that don't expose the LogManager#close
079        if (!managers.containsKey(name)) {
080            LogConfigDescriptor config = getDescriptor(XP_LOG_CONFIG, name);
081            if (config == null) {
082                throw new IllegalArgumentException("Unknown logConfig: " + name);
083            }
084            if ("kafka".equalsIgnoreCase(config.type)) {
085                managers.put(name, createKafkaLogManager(config));
086            } else {
087                managers.put(name, createChronicleLogManager(config));
088            }
089        }
090        return managers.get(name);
091    }
092
093    protected LogManager createKafkaLogManager(LogConfigDescriptor config) {
094        String kafkaConfig = config.options.getOrDefault("kafkaConfig", "default");
095        KafkaConfigService service = Framework.getService(KafkaConfigService.class);
096        return new KafkaLogManager(service.getTopicPrefix(kafkaConfig), service.getProducerProperties(kafkaConfig),
097                service.getConsumerProperties(kafkaConfig));
098    }
099
100    protected LogManager createChronicleLogManager(LogConfigDescriptor config) {
101        String basePath = config.options.getOrDefault("basePath", null);
102        String directory = config.options.getOrDefault("directory", config.getId());
103        Path path = getChroniclePath(basePath, directory);
104        String retention = getChronicleRetention(config.options.getOrDefault("retention", null));
105        return new ChronicleLogManager(path, retention);
106    }
107
108    protected String getChronicleRetention(String retention) {
109        return retention != null ? retention : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d");
110    }
111
112    protected Path getChroniclePath(String basePath, String name) {
113        if (basePath != null) {
114            return Paths.get(basePath, name).toAbsolutePath();
115        }
116        basePath = Framework.getProperty(NUXEO_STREAM_DIR_PROP);
117        if (basePath != null) {
118            return Paths.get(basePath, name).toAbsolutePath();
119        }
120        basePath = Framework.getProperty(Environment.NUXEO_DATA_DIR);
121        if (basePath != null) {
122            return Paths.get(basePath, "stream", name).toAbsolutePath();
123        }
124        return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream", name).toAbsolutePath();
125    }
126
127    protected void createStreamIfNotExists(LogConfigDescriptor config) {
128        if (config.logs.isEmpty()) {
129            return;
130        }
131        LogManager manager = getLogManager(config.getId());
132        config.logs.forEach(l -> {
133            log.info("Create if not exists stream: {} with manager: {}", l.getId(), config.getId());
134            manager.createIfNotExists(l.getId(), l.size);
135        });
136    }
137
138    @Override
139    public void start(ComponentContext context) {
140        super.start(context);
141        List<LogConfigDescriptor> logDescs = getDescriptors(XP_LOG_CONFIG);
142        logDescs.forEach(this::createStreamIfNotExists);
143        List<StreamProcessorDescriptor> streamDescs = getDescriptors(XP_STREAM_PROCESSOR);
144        streamDescs.forEach(this::initProcessor);
145        new ComponentsLifeCycleListener().install();
146    }
147
148    protected void initProcessor(StreamProcessorDescriptor descriptor) {
149        if (processors.containsKey(descriptor.getId())) {
150            log.error("Processor already initialized: {}", descriptor.getId());
151            return;
152        }
153        log.info("Init Stream processor: {} with manager: {}", descriptor.getId(), descriptor.config);
154        LogManager manager = getLogManager(descriptor.config);
155        Topology topology;
156        try {
157            topology = descriptor.klass.getDeclaredConstructor().newInstance().getTopology(descriptor.options);
158        } catch (ReflectiveOperationException e) {
159            throw new StreamRuntimeException("Can not create topology for processor: " + descriptor.getId(), e);
160        }
161        StreamProcessor streamProcessor = new LogStreamProcessor(manager);
162        Settings settings = getSettings(descriptor);
163        log.debug("Starting computation topology: {}\n{}", descriptor::getId, () -> topology.toPlantuml(settings));
164        streamProcessor.init(topology, settings);
165        processors.put(descriptor.getId(), streamProcessor);
166    }
167
168    protected Settings getSettings(StreamProcessorDescriptor descriptor) {
169        CodecService codecService = Framework.getService(CodecService.class);
170        Codec<Record> actualCodec = descriptor.defaultCodec == null ? codecService.getCodec(DEFAULT_CODEC, Record.class)
171                : codecService.getCodec(descriptor.defaultCodec, Record.class);
172        Settings settings = new Settings(descriptor.defaultConcurrency, descriptor.defaultPartitions, actualCodec,
173                descriptor.getDefaultPolicy());
174        descriptor.computations.forEach(comp -> settings.setConcurrency(comp.name, comp.concurrency));
175        descriptor.policies.forEach(policy -> settings.setPolicy(policy.name, descriptor.getPolicy(policy.name)));
176        descriptor.streams.forEach(stream -> settings.setPartitions(stream.name, stream.partitions));
177        descriptor.streams.stream()
178                          .filter(stream -> Objects.nonNull(stream.codec))
179                          .forEach(stream -> settings.setCodec(stream.name,
180                                  codecService.getCodec(stream.codec, Record.class)));
181        return settings;
182    }
183
184    @Override
185    public void stop(ComponentContext context) throws InterruptedException {
186        super.stop(context);
187        stopComputations(); // should have already be done by the beforeStop listener
188        closeLogManagers();
189    }
190
191    protected void startComputations() {
192        getDescriptors(XP_STREAM_PROCESSOR).forEach(d -> {
193            StreamProcessor manager = processors.get(d.getId());
194            if (manager != null) {
195                manager.start();
196            }
197        });
198    }
199
200    protected void stopComputations() {
201        processors.forEach((name, manager) -> manager.stop(Duration.ofSeconds(1)));
202        processors.clear();
203    }
204
205    protected void closeLogManagers() {
206        managers.values().stream().filter(Objects::nonNull).forEach(LogManager::close);
207        managers.clear();
208    }
209
210    protected class ComponentsLifeCycleListener implements ComponentManager.Listener {
211        @Override
212        public void afterStart(ComponentManager mgr, boolean isResume) {
213            // this is called once all components are started and ready
214            startComputations();
215        }
216
217        @Override
218        public void beforeStop(ComponentManager mgr, boolean isStandby) {
219            // this is called before components are stopped
220            stopComputations();
221            Framework.getRuntime().getComponentManager().removeListener(this);
222        }
223    }
224}