001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.pubsub;
020
021import java.time.Duration;
022import java.util.List;
023import java.util.Map;
024import java.util.Random;
025import java.util.function.BiConsumer;
026
027import org.apache.commons.lang3.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.lib.stream.codec.Codec;
031import org.nuxeo.lib.stream.computation.Record;
032import org.nuxeo.lib.stream.log.LogAppender;
033import org.nuxeo.lib.stream.log.LogRecord;
034import org.nuxeo.lib.stream.log.LogTailer;
035import org.nuxeo.runtime.api.Framework;
036import org.nuxeo.runtime.codec.CodecService;
037import org.nuxeo.runtime.stream.StreamService;
038
039/**
040 * A Pub/Sub provider based on Nuxeo Stream.
041 *
042 * @since 10.1
043 */
044public class StreamPubSubProvider extends AbstractPubSubProvider {
045    private static final Log log = LogFactory.getLog(StreamPubSubProvider.class);
046
047    public static final String GROUP_PREFIX = "pub-sub-node-";
048
049    protected static final String NODE_ID_PROP = "repository.clustering.id";
050
051    protected static final String LOG_CONFIG_OPT = "logConfig";
052
053    protected static final String DEFAULT_LOG_CONFIG = "default";
054
055    protected static final String LOG_NAME_OPT = "logName";
056
057    protected static final String CODEC_OPT = "codec";
058
059    protected static final String DEFAULT_CODEC = "avroBinary";
060
061    protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength)
062
063    protected String logConfig;
064
065    protected String logName;
066
067    protected LogAppender<Record> appender;
068
069    protected Thread thread;
070
071    protected Codec<Record> codec;
072
073    @Override
074    public void initialize(Map<String, String> options, Map<String, List<BiConsumer<String, byte[]>>> subscribers) {
075        log.debug("Initializing ");
076        super.initialize(options, subscribers);
077        logConfig = options.getOrDefault(LOG_CONFIG_OPT, DEFAULT_LOG_CONFIG);
078        logName = options.get(LOG_NAME_OPT);
079        if (StringUtils.isBlank(logName)) {
080            throw new IllegalArgumentException("Missing option logName in StreamPubSubProviderDescriptor");
081        }
082        String codecName = options.getOrDefault(CODEC_OPT, DEFAULT_CODEC);
083        CodecService codecService = Framework.getService(CodecService.class);
084        codec = codecService.getCodec(codecName, Record.class);
085        appender = Framework.getService(StreamService.class).getLogManager(logConfig).getAppender(logName, codec);
086        startConsumerThread();
087        log.debug("Initialized");
088    }
089
090    protected void startConsumerThread() {
091        Subscriber subscriber = new Subscriber();
092        thread = new Thread(subscriber, "Nuxeo-PubSub-Stream");
093        thread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
094        thread.setPriority(Thread.NORM_PRIORITY);
095        thread.setDaemon(true);
096        thread.start();
097    }
098
099    @Override
100    public void publish(String topic, byte[] message) {
101        appender.append(topic, Record.of(topic, message));
102    }
103
104    @Override
105    public void close() {
106        appender = null;
107        if (thread != null) {
108            thread.interrupt();
109            thread = null;
110            log.debug("Closed");
111        }
112    }
113
114    public class Subscriber implements Runnable {
115
116        @Override
117        public void run() {
118            // using different group name enable fan out
119            String group = GROUP_PREFIX + getNodeId();
120            log.debug("Starting subscriber thread with group: " + group);
121            try (LogTailer<Record> tailer = Framework.getService(StreamService.class)
122                                                     .getLogManager(logConfig)
123                                                     .createTailer(group, logName, codec)) {
124                // Only interested in new messages
125                tailer.toEnd();
126                for (;;) {
127                    try {
128                        LogRecord<Record> logRecord = tailer.read(Duration.ofSeconds(5));
129                        if (logRecord == null) {
130                            continue;
131                        }
132                        Record record = logRecord.message();
133                        localPublish(record.getKey(), record.getData());
134                    } catch (InterruptedException e) {
135                        Thread.currentThread().interrupt();
136                        log.debug("Subscriber thread interrupted, exiting");
137                        return;
138                    }
139                }
140            }
141
142        }
143    }
144
145    protected String getNodeId() {
146        String nodeId = Framework.getProperty(NODE_ID_PROP);
147        if (StringUtils.isBlank(nodeId)) {
148            return String.valueOf(RANDOM.nextLong());
149        }
150        return nodeId.trim();
151    }
152}