001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.stream;
020
021import java.nio.file.Path;
022import java.nio.file.Paths;
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028
029import org.apache.logging.log4j.Logger;
030import org.nuxeo.common.Environment;
031import org.nuxeo.lib.stream.StreamRuntimeException;
032import org.nuxeo.lib.stream.codec.Codec;
033import org.nuxeo.lib.stream.computation.Record;
034import org.nuxeo.lib.stream.computation.Settings;
035import org.nuxeo.lib.stream.computation.StreamManager;
036import org.nuxeo.lib.stream.computation.StreamProcessor;
037import org.nuxeo.lib.stream.computation.Topology;
038import org.nuxeo.lib.stream.computation.log.LogStreamManager;
039import org.nuxeo.lib.stream.log.LogConfig;
040import org.nuxeo.lib.stream.log.LogManager;
041import org.nuxeo.lib.stream.log.Name;
042import org.nuxeo.lib.stream.log.UnifiedLogManager;
043import org.nuxeo.lib.stream.log.chronicle.ChronicleLogConfig;
044import org.nuxeo.lib.stream.log.kafka.KafkaLogConfig;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.codec.CodecService;
047import org.nuxeo.runtime.kafka.KafkaConfigService;
048import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl;
049import org.nuxeo.runtime.model.ComponentContext;
050import org.nuxeo.runtime.model.ComponentManager;
051import org.nuxeo.runtime.model.DefaultComponent;
052
053/**
054 * @since 9.3
055 */
056public class StreamServiceImpl extends DefaultComponent implements StreamService {
057
058    private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(StreamServiceImpl.class);
059
060    public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir";
061
062    public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration";
063
064    public static final String DEFAULT_CODEC = "avro";
065
066    protected static final String XP_LOG_CONFIG = "logConfig";
067
068    protected static final String XP_STREAM_PROCESSOR = "streamProcessor";
069
070    protected LogManager logManager;
071
072    protected StreamManager streamManager;
073
074    protected final Map<String, StreamProcessor> processors = new HashMap<>();
075
076    /**
077     * @since 11.2
078     */
079    public static final String STREAM_PROCESSING_ENABLED = "nuxeo.stream.processing.enabled";
080
081    protected Boolean isStreamProcessingDisabled;
082
083    @Override
084    public int getApplicationStartedOrder() {
085        // start after kafka config service
086        return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER + 10;
087    }
088
089    @Override
090    public LogManager getLogManager() {
091        // TODO: returns a wrapper that don't expose the LogManager#close
092        return logManager;
093    }
094
095    @Override
096    public StreamManager getStreamManager() {
097        return streamManager;
098    }
099
100    protected String getChronicleRetention(String retention) {
101        return retention != null ? retention : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d");
102    }
103
104    protected Path getChroniclePath(String basePath) {
105        if (basePath != null) {
106            return Paths.get(basePath).toAbsolutePath();
107        }
108        basePath = Framework.getProperty(NUXEO_STREAM_DIR_PROP);
109        if (basePath != null) {
110            return Paths.get(basePath).toAbsolutePath();
111        }
112        basePath = Framework.getProperty(Environment.NUXEO_DATA_DIR);
113        if (basePath != null) {
114            return Paths.get(basePath, "stream").toAbsolutePath();
115        }
116        return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream").toAbsolutePath();
117    }
118
119    protected void createLogIfNotExists(LogConfigDescriptor config) {
120        if (!config.isEnabled() || config.logs.isEmpty()) {
121            return;
122        }
123        config.logs.forEach(l -> {
124            log.info("Create if not exists stream: {} with manager: {}", l.getId(), config.getId());
125            logManager.createIfNotExists(Name.ofUrn(l.getId()), l.size);
126        });
127    }
128
129    @Override
130    public void start(ComponentContext context) {
131        super.start(context);
132        List<LogConfig> configs = getLogConfigs();
133        logManager = new UnifiedLogManager(configs);
134        streamManager = new LogStreamManager(logManager);
135        List<LogConfigDescriptor> logDescs = getDescriptors(XP_LOG_CONFIG);
136        logDescs.forEach(this::createLogIfNotExists);
137        List<StreamProcessorDescriptor> streamDescs = getDescriptors(XP_STREAM_PROCESSOR);
138        streamDescs.forEach(this::initProcessor);
139        new ComponentsLifeCycleListener().install();
140    }
141
142    protected List<LogConfig> getLogConfigs() {
143        List<LogConfigDescriptor> logDescs = getDescriptors(XP_LOG_CONFIG);
144        List<LogConfig> ret = new ArrayList<>(logDescs.size());
145        for (LogConfigDescriptor desc : logDescs) {
146            if (!desc.isEnabled() || desc.onlyLogDeclaration()) {
147                continue;
148            }
149            if ("kafka".equalsIgnoreCase(desc.type)) {
150                ret.add(createKafkaLogConfig(desc));
151            } else {
152                ret.add(createChronicleLogConfig(desc));
153            }
154        }
155        return ret;
156    }
157
158    protected LogConfig createKafkaLogConfig(LogConfigDescriptor desc) {
159        String kafkaConfig = desc.options.getOrDefault("kafkaConfig", "default");
160        KafkaConfigService service = Framework.getService(KafkaConfigService.class);
161        return new KafkaLogConfig(desc.getId(), desc.isDefault(), desc.getPatterns(),
162                service.getTopicPrefix(kafkaConfig),
163                service.getAdminProperties(kafkaConfig), service.getProducerProperties(kafkaConfig),
164                service.getConsumerProperties(kafkaConfig));
165    }
166
167    protected LogConfig createChronicleLogConfig(LogConfigDescriptor desc) {
168        String basePath = desc.options.getOrDefault("basePath", null);
169        Path path = getChroniclePath(basePath);
170        String retention = getChronicleRetention(desc.options.getOrDefault("retention", null));
171        return new ChronicleLogConfig(desc.getId(), desc.isDefault(), desc.getPatterns(), path, retention);
172    }
173
174    protected void initProcessor(StreamProcessorDescriptor descriptor) {
175        if (! descriptor.isEnabled()) {
176            log.info("Processor {} disabled", descriptor.getId());
177            return;
178        }
179        if (processors.containsKey(descriptor.getId())) {
180            log.error("Processor already initialized: {}", descriptor.getId());
181            return;
182        }
183        log.info("Init Stream processor: {}", descriptor.getId());
184        Topology topology;
185        try {
186            topology = descriptor.klass.getDeclaredConstructor().newInstance().getTopology(descriptor.options);
187        } catch (ReflectiveOperationException e) {
188            throw new StreamRuntimeException("Can not create topology for processor: " + descriptor.getId(), e);
189        }
190        Settings settings = getSettings(descriptor);
191        log.debug("Starting computation topology: {}\n{}", descriptor::getId, () -> topology.toPlantuml(settings));
192        if (!isProcessingDisabled() && descriptor.isStart()) {
193            StreamProcessor streamProcessor = streamManager.registerAndCreateProcessor(descriptor.getId(), topology,
194                    settings);
195            processors.put(descriptor.getId(), streamProcessor);
196        } else {
197            streamManager.register(descriptor.getId(), topology, settings);
198            processors.put(descriptor.getId(), null);
199        }
200    }
201
202    protected Settings getSettings(StreamProcessorDescriptor descriptor) {
203        CodecService codecService = Framework.getService(CodecService.class);
204        Codec<Record> actualCodec = descriptor.defaultCodec == null ? codecService.getCodec(DEFAULT_CODEC, Record.class)
205                : codecService.getCodec(descriptor.defaultCodec, Record.class);
206        Settings settings = new Settings(descriptor.defaultConcurrency, descriptor.defaultPartitions, actualCodec,
207                descriptor.getDefaultPolicy(), null, descriptor.defaultExternal);
208        descriptor.computations.forEach(comp -> settings.setConcurrency(comp.name, comp.concurrency));
209        descriptor.policies.forEach(policy -> settings.setPolicy(policy.name, descriptor.getPolicy(policy.name)));
210        for (StreamProcessorDescriptor.StreamDescriptor streamDescriptor : descriptor.streams) {
211            settings.setPartitions(streamDescriptor.name,
212                    streamDescriptor.partitions != null ? streamDescriptor.partitions : descriptor.defaultPartitions);
213            if (streamDescriptor.codec != null) {
214                settings.setCodec(streamDescriptor.name, codecService.getCodec(streamDescriptor.codec, Record.class));
215            }
216            streamDescriptor.filters.forEach(filter -> settings.addFilter(streamDescriptor.name, filter.getFilter()));
217            settings.setExternal(Name.ofUrn(streamDescriptor.name),
218                    streamDescriptor.external != null ? streamDescriptor.external : descriptor.defaultExternal);
219        }
220        return settings;
221    }
222
223    @Override
224    public void stop(ComponentContext context) throws InterruptedException {
225        super.stop(context);
226        logManager.close();
227    }
228
229    protected void startProcessors() {
230        log.debug("Start processors");
231        getDescriptors(XP_STREAM_PROCESSOR).forEach(d -> {
232            StreamProcessor processor = processors.get(d.getId());
233            if (processor != null) {
234                processor.start();
235            }
236        });
237    }
238
239    @Override
240    public void stopProcessors() {
241        log.debug("Stop processors");
242        processors.forEach((name, processor) -> {
243            if (processor != null) {
244                processor.stop(Duration.ofSeconds(1));
245            }
246        });
247        processors.clear();
248    }
249
250    protected class ComponentsLifeCycleListener implements ComponentManager.Listener {
251        @Override
252        public void afterStart(ComponentManager mgr, boolean isResume) {
253            // this is called once all components are started and ready
254            log.debug("afterStart");
255            startProcessors();
256        }
257
258        @Override
259        public void beforeStop(ComponentManager mgr, boolean isStandby) {
260            // this is called before components are stopped
261            log.debug("beforeStop");
262            stopProcessors();
263            Framework.getRuntime().getComponentManager().removeListener(this);
264        }
265    }
266
267    protected boolean isProcessingDisabled() {
268        if (isStreamProcessingDisabled == null) {
269            if (Framework.isBooleanPropertyFalse(STREAM_PROCESSING_ENABLED)) {
270                log.warn("Stream Processing has been disabled on this node");
271                isStreamProcessingDisabled = true;
272            } else {
273                isStreamProcessingDisabled = false;
274            }
275        }
276        return isStreamProcessingDisabled;
277    }
278}