001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.runtime.pubsub; 020 021import java.time.Duration; 022import java.util.List; 023import java.util.Map; 024import java.util.Random; 025import java.util.function.BiConsumer; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.lib.stream.codec.Codec; 030import org.nuxeo.lib.stream.computation.Record; 031import org.nuxeo.lib.stream.log.LogAppender; 032import org.nuxeo.lib.stream.log.LogRecord; 033import org.nuxeo.lib.stream.log.LogTailer; 034import org.nuxeo.lib.stream.log.Name; 035import org.nuxeo.runtime.api.Framework; 036import org.nuxeo.runtime.cluster.ClusterService; 037import org.nuxeo.runtime.codec.CodecService; 038import org.nuxeo.runtime.stream.StreamService; 039 040/** 041 * A Pub/Sub provider based on Nuxeo Stream. 042 * 043 * @since 10.1 044 */ 045public class StreamPubSubProvider extends AbstractPubSubProvider { 046 private static final Log log = LogFactory.getLog(StreamPubSubProvider.class); 047 048 public static final String GROUP_PREFIX = "pubsub/pubSub-"; 049 050 protected static final String LOG_CONFIG_OPT = "logConfig"; 051 052 protected static final String DEFAULT_LOG_CONFIG = "default"; 053 054 protected static final String LOG_NAME_OPT = "logName"; 055 056 protected static final String CODEC_OPT = "codec"; 057 058 protected static final String DEFAULT_CODEC = "avroBinary"; 059 060 protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength) 061 062 protected String logConfig; 063 064 protected Name logName; 065 066 protected LogAppender<Record> appender; 067 068 protected Thread thread; 069 070 protected Codec<Record> codec; 071 072 protected String nodeId; 073 074 @Override 075 public void initialize(Map<String, String> options, Map<String, List<BiConsumer<String, byte[]>>> subscribers) { 076 log.debug("Initializing "); 077 super.initialize(options, subscribers); 078 logConfig = options.getOrDefault(LOG_CONFIG_OPT, DEFAULT_LOG_CONFIG); 079 logName = Name.ofUrn(options.get(LOG_NAME_OPT)); 080 String codecName = options.getOrDefault(CODEC_OPT, DEFAULT_CODEC); 081 CodecService codecService = Framework.getService(CodecService.class); 082 codec = codecService.getCodec(codecName, Record.class); 083 appender = Framework.getService(StreamService.class).getLogManager(logConfig).getAppender(logName, codec); 084 nodeId = Framework.getService(ClusterService.class).getNodeId(); 085 startConsumerThread(); 086 log.debug("Initialized"); 087 } 088 089 protected void startConsumerThread() { 090 Subscriber subscriber = new Subscriber(); 091 thread = new Thread(subscriber, "Nuxeo-PubSub-Stream"); 092 thread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e)); 093 thread.setPriority(Thread.NORM_PRIORITY); 094 thread.setDaemon(true); 095 thread.start(); 096 } 097 098 @Override 099 public void publish(String topic, byte[] message) { 100 appender.append(topic, Record.of(topic, message)); 101 } 102 103 @Override 104 public void close() { 105 appender = null; 106 if (thread != null) { 107 thread.interrupt(); 108 thread = null; 109 log.debug("Closed"); 110 } 111 } 112 113 public class Subscriber implements Runnable { 114 115 @Override 116 public void run() { 117 // using different group name enable fan out 118 Name group = Name.ofUrn(GROUP_PREFIX + nodeId); 119 log.debug("Starting subscriber thread with group: " + group); 120 try (LogTailer<Record> tailer = Framework.getService(StreamService.class) 121 .getLogManager(logConfig) 122 .createTailer(group, logName, codec)) { 123 // Only interested in new messages 124 tailer.toEnd(); 125 for (;;) { 126 try { 127 LogRecord<Record> logRecord = tailer.read(Duration.ofSeconds(5)); 128 if (logRecord == null) { 129 continue; 130 } 131 Record record = logRecord.message(); 132 localPublish(record.getKey(), record.getData()); 133 } catch (InterruptedException e) { 134 Thread.currentThread().interrupt(); 135 log.debug("Subscriber thread interrupted, exiting"); 136 return; 137 } 138 } 139 } 140 141 } 142 } 143 144}