001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import static redis.clients.jedis.Protocol.Keyword.MESSAGE; 022import static redis.clients.jedis.Protocol.Keyword.PMESSAGE; 023import static redis.clients.jedis.Protocol.Keyword.PSUBSCRIBE; 024import static redis.clients.jedis.Protocol.Keyword.PUNSUBSCRIBE; 025import static redis.clients.jedis.Protocol.Keyword.SUBSCRIBE; 026import static redis.clients.jedis.Protocol.Keyword.UNSUBSCRIBE; 027 028import java.util.Arrays; 029import java.util.List; 030import java.util.Map; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.function.BiConsumer; 037 038import org.apache.commons.lang3.reflect.MethodUtils; 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.redis.RedisAdmin; 043import org.nuxeo.ecm.core.redis.RedisExecutor; 044import org.nuxeo.runtime.api.Framework; 045import org.nuxeo.runtime.pubsub.AbstractPubSubProvider; 046import org.nuxeo.runtime.pubsub.PubSubProvider; 047 048import redis.clients.jedis.Client; 049import redis.clients.jedis.JedisPubSub; 050import redis.clients.jedis.exceptions.JedisException; 051import redis.clients.util.SafeEncoder; 052 053/** 054 * Redis implementation of {@link PubSubProvider}. 055 * 056 * @since 9.1 057 */ 058public class RedisPubSubProvider extends AbstractPubSubProvider { 059 060 // package-private to avoid synthetic accessor for nested class 061 static final Log log = LogFactory.getLog(RedisPubSubProvider.class); 062 063 /** Maximum delay to wait for a channel subscription on startup. */ 064 public static final long TIMEOUT_SUBSCRIBE_SECONDS = 5; 065 066 protected static final String THREAD_NAME = "Nuxeo-PubSub-Redis"; 067 068 protected static final AtomicInteger THREAD_NUMBER = new AtomicInteger(); 069 070 protected static final ExecutorService THREAD_POOL = Executors.newCachedThreadPool( 071 r -> new Thread(r, THREAD_NAME + "-" + THREAD_NUMBER.incrementAndGet())); 072 073 protected Dispatcher dispatcher; 074 075 protected Thread thread; 076 077 @Override 078 public void initialize(Map<String, String> options, Map<String, List<BiConsumer<String, byte[]>>> subscribers) { 079 super.initialize(options, subscribers); 080 log.debug("Initializing"); 081 namespace = Framework.getService(RedisAdmin.class).namespace(); 082 dispatcher = new Dispatcher(namespace + "*"); 083 thread = new Thread(dispatcher::run, THREAD_NAME); 084 thread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e)); 085 thread.setPriority(Thread.NORM_PRIORITY); 086 thread.setDaemon(true); 087 thread.start(); 088 if (!dispatcher.awaitSubscribed(TIMEOUT_SUBSCRIBE_SECONDS, TimeUnit.SECONDS)) { 089 thread.interrupt(); 090 throw new NuxeoException( 091 "Failed to subscribe to Redis pubsub after " + TIMEOUT_SUBSCRIBE_SECONDS + "s"); 092 } 093 log.debug("Initialized"); 094 } 095 096 @Override 097 public void close() { 098 log.debug("Closing"); 099 if (dispatcher != null) { 100 thread.interrupt(); 101 thread = null; 102 dispatcher.close(); 103 dispatcher = null; 104 } 105 log.debug("Closed"); 106 } 107 108 /** 109 * Subscribes to the provided Redis channel pattern and dispatches received messages. Method {@code #run} must be 110 * called in a new thread. 111 */ 112 public class Dispatcher extends JedisPubSub { 113 114 // we look this up during construction in the main thread, 115 // because service lookup is unavailable from alternative threads during startup 116 protected RedisExecutor redisExecutor; 117 118 protected final String pattern; 119 120 protected final CountDownLatch subscribedLatch; 121 122 protected volatile boolean stop; 123 124 public Dispatcher(String pattern) { 125 redisExecutor = Framework.getService(RedisExecutor.class); 126 this.pattern = pattern; 127 this.subscribedLatch = new CountDownLatch(1); 128 } 129 130 /** 131 * To be called from the main thread to wait for subscription to be effective. 132 */ 133 public boolean awaitSubscribed(long timeout, TimeUnit unit) { 134 try { 135 return subscribedLatch.await(timeout, unit); 136 } catch (InterruptedException e) { 137 Thread.currentThread().interrupt(); 138 throw new NuxeoException(e); 139 } 140 } 141 142 /** 143 * To be called from a new thread to do the actual Redis subscription and to dispatch messages. 144 */ 145 public void run() { 146 log.debug("Subscribing to: " + pattern); 147 // we can't do service lookup during startup here because we're in a separate thread 148 RedisExecutor redisExecutor = this.redisExecutor; 149 this.redisExecutor = null; 150 redisExecutor.psubscribe(this, pattern); 151 } 152 153 /** 154 * To be called from the main thread to stop the subscription. 155 */ 156 public void close() { 157 stop = true; 158 // send an empty message so that the dispatcher thread can be woken up and stop 159 publish("", new byte[0]); 160 } 161 162 @Override 163 public void onPSubscribe(String pattern, int subscribedChannels) { 164 subscribedLatch.countDown(); 165 if (log.isDebugEnabled()) { 166 log.debug("Subscribed to: " + pattern); 167 } 168 } 169 170 public void onMessage(String channel, byte[] message) { 171 if (message == null) { 172 message = new byte[0]; 173 } 174 if (log.isTraceEnabled()) { 175 log.trace("Message received from channel: " + channel + " (" + message.length + " bytes)"); 176 } 177 String topic = channel.substring(namespace.length()); 178 // localPublish needs to be called in a different thread, 179 // so that if a subscriber calls Redis it doesn't reuse our current Redis connection 180 // which can only be used for subscribe/unsubscribe/ping commands. 181 final byte[] finalMessage = message; 182 THREAD_POOL.execute(() -> localPublish(topic, finalMessage)); 183 } 184 185 public void onPMessage(String pattern, String channel, byte[] message) { 186 onMessage(channel, message); 187 } 188 189 @Override 190 public void proceed(Client client, String... channels) { 191 client.subscribe(channels); 192 flush(client); 193 processBinary(client); 194 } 195 196 @Override 197 public void proceedWithPatterns(Client client, String... patterns) { 198 client.psubscribe(patterns); 199 flush(client); 200 processBinary(client); 201 } 202 203 // stupid Jedis has a protected flush method 204 protected void flush(Client client) { 205 try { 206 MethodUtils.invokeMethod(client, true, "flush"); 207 } catch (ReflectiveOperationException e) { 208 throw new NuxeoException(e); 209 } 210 } 211 212 // patched process() to pass the raw binary message to onMessage and onPMessage 213 protected void processBinary(Client client) { 214 for (;;) { 215 List<Object> reply = client.getRawObjectMultiBulkReply(); 216 if (stop) { 217 return; 218 } 219 Object type = reply.get(0); 220 if (!(type instanceof byte[])) { 221 throw new JedisException("Unknown message type: " + type); 222 } 223 byte[] btype = (byte[]) type; 224 if (Arrays.equals(MESSAGE.raw, btype)) { 225 byte[] bchannel = (byte[]) reply.get(1); 226 byte[] bmesg = (byte[]) reply.get(2); 227 onMessage(toString(bchannel), bmesg); 228 } else if (Arrays.equals(PMESSAGE.raw, btype)) { 229 byte[] bpattern = (byte[]) reply.get(1); 230 byte[] bchannel = (byte[]) reply.get(2); 231 byte[] bmesg = (byte[]) reply.get(3); 232 onPMessage(toString(bpattern), toString(bchannel), bmesg); 233 } else if (Arrays.equals(SUBSCRIBE.raw, btype)) { 234 byte[] bchannel = (byte[]) reply.get(1); 235 onSubscribe(toString(bchannel), 0); 236 } else if (Arrays.equals(PSUBSCRIBE.raw, btype)) { 237 byte[] bpattern = (byte[]) reply.get(1); 238 onPSubscribe(toString(bpattern), 0); 239 } else if (Arrays.equals(UNSUBSCRIBE.raw, btype)) { 240 byte[] bchannel = (byte[]) reply.get(1); 241 onUnsubscribe(toString(bchannel), 0); 242 } else if (Arrays.equals(PUNSUBSCRIBE.raw, btype)) { 243 byte[] bpattern = (byte[]) reply.get(1); 244 onPUnsubscribe(toString(bpattern), 0); 245 } else { 246 throw new JedisException("Unknown message: " + toString(btype)); 247 } 248 } 249 } 250 251 protected String toString(byte[] bytes) { 252 return bytes == null ? null : SafeEncoder.encode(bytes); 253 } 254 255 } 256 257 // ===== PubSubService ===== 258 259 @Override 260 public void publish(String topic, byte[] message) { 261 String channel = namespace + topic; 262 byte[] bchannel = SafeEncoder.encode(channel); 263 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 264 if (redisExecutor != null) { 265 redisExecutor.execute(jedis -> jedis.publish(bchannel, message)); 266 } 267 } 268 269}