001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.pubsub;
020
021import java.time.Duration;
022import java.util.List;
023import java.util.Map;
024import java.util.Random;
025import java.util.function.BiConsumer;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.lib.stream.codec.Codec;
030import org.nuxeo.lib.stream.computation.Record;
031import org.nuxeo.lib.stream.log.LogAppender;
032import org.nuxeo.lib.stream.log.LogRecord;
033import org.nuxeo.lib.stream.log.LogTailer;
034import org.nuxeo.lib.stream.log.Name;
035import org.nuxeo.runtime.api.Framework;
036import org.nuxeo.runtime.cluster.ClusterService;
037import org.nuxeo.runtime.codec.CodecService;
038import org.nuxeo.runtime.stream.StreamService;
039
040/**
041 * A Pub/Sub provider based on Nuxeo Stream.
042 *
043 * @since 10.1
044 */
045public class StreamPubSubProvider extends AbstractPubSubProvider {
046    private static final Log log = LogFactory.getLog(StreamPubSubProvider.class);
047
048    public static final String GROUP_PREFIX = "pubsub/pubSub-";
049
050    protected static final String LOG_CONFIG_OPT = "logConfig";
051
052    protected static final String DEFAULT_LOG_CONFIG = "default";
053
054    protected static final String LOG_NAME_OPT = "logName";
055
056    protected static final String CODEC_OPT = "codec";
057
058    protected static final String DEFAULT_CODEC = "avroBinary";
059
060    protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength)
061
062    protected String logConfig;
063
064    protected Name logName;
065
066    protected LogAppender<Record> appender;
067
068    protected Thread thread;
069
070    protected Codec<Record> codec;
071
072    protected String nodeId;
073
074    @Override
075    public void initialize(Map<String, String> options, Map<String, List<BiConsumer<String, byte[]>>> subscribers) {
076        log.debug("Initializing ");
077        super.initialize(options, subscribers);
078        logConfig = options.getOrDefault(LOG_CONFIG_OPT, DEFAULT_LOG_CONFIG);
079        logName = Name.ofUrn(options.get(LOG_NAME_OPT));
080        String codecName = options.getOrDefault(CODEC_OPT, DEFAULT_CODEC);
081        CodecService codecService = Framework.getService(CodecService.class);
082        codec = codecService.getCodec(codecName, Record.class);
083        appender = Framework.getService(StreamService.class).getLogManager(logConfig).getAppender(logName, codec);
084        nodeId = Framework.getService(ClusterService.class).getNodeId();
085        startConsumerThread();
086        log.debug("Initialized");
087    }
088
089    protected void startConsumerThread() {
090        Subscriber subscriber = new Subscriber();
091        thread = new Thread(subscriber, "Nuxeo-PubSub-Stream");
092        thread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
093        thread.setPriority(Thread.NORM_PRIORITY);
094        thread.setDaemon(true);
095        thread.start();
096    }
097
098    @Override
099    public void publish(String topic, byte[] message) {
100        appender.append(topic, Record.of(topic, message));
101    }
102
103    @Override
104    public void close() {
105        appender = null;
106        if (thread != null) {
107            thread.interrupt();
108            thread = null;
109            log.debug("Closed");
110        }
111    }
112
113    public class Subscriber implements Runnable {
114
115        @Override
116        public void run() {
117            // using different group name enable fan out
118            Name group = Name.ofUrn(GROUP_PREFIX + nodeId);
119            log.debug("Starting subscriber thread with group: " + group);
120            try (LogTailer<Record> tailer = Framework.getService(StreamService.class)
121                                                     .getLogManager(logConfig)
122                                                     .createTailer(group, logName, codec)) {
123                // Only interested in new messages
124                tailer.toEnd();
125                for (;;) {
126                    try {
127                        LogRecord<Record> logRecord = tailer.read(Duration.ofSeconds(5));
128                        if (logRecord == null) {
129                            continue;
130                        }
131                        Record record = logRecord.message();
132                        localPublish(record.getKey(), record.getData());
133                    } catch (InterruptedException e) {
134                        Thread.currentThread().interrupt();
135                        log.debug("Subscriber thread interrupted, exiting");
136                        return;
137                    }
138                }
139            }
140
141        }
142    }
143
144}