001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import static redis.clients.jedis.Protocol.Keyword.MESSAGE;
022import static redis.clients.jedis.Protocol.Keyword.PMESSAGE;
023import static redis.clients.jedis.Protocol.Keyword.PSUBSCRIBE;
024import static redis.clients.jedis.Protocol.Keyword.PUNSUBSCRIBE;
025import static redis.clients.jedis.Protocol.Keyword.SUBSCRIBE;
026import static redis.clients.jedis.Protocol.Keyword.UNSUBSCRIBE;
027
028import java.util.Arrays;
029import java.util.List;
030import java.util.Map;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.function.BiConsumer;
037
038import org.apache.commons.lang3.reflect.MethodUtils;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.redis.RedisAdmin;
043import org.nuxeo.ecm.core.redis.RedisExecutor;
044import org.nuxeo.runtime.api.Framework;
045import org.nuxeo.runtime.pubsub.AbstractPubSubProvider;
046import org.nuxeo.runtime.pubsub.PubSubProvider;
047
048import redis.clients.jedis.Client;
049import redis.clients.jedis.JedisPubSub;
050import redis.clients.jedis.exceptions.JedisException;
051import redis.clients.util.SafeEncoder;
052
053/**
054 * Redis implementation of {@link PubSubProvider}.
055 *
056 * @since 9.1
057 */
058public class RedisPubSubProvider extends AbstractPubSubProvider {
059
060    // package-private to avoid synthetic accessor for nested class
061    static final Log log = LogFactory.getLog(RedisPubSubProvider.class);
062
063    /** Maximum delay to wait for a channel subscription on startup. */
064    public static final long TIMEOUT_SUBSCRIBE_SECONDS = 5;
065
066    protected static final String THREAD_NAME = "Nuxeo-PubSub-Redis";
067
068    protected static final AtomicInteger THREAD_NUMBER = new AtomicInteger();
069
070    protected static final ExecutorService THREAD_POOL = Executors.newCachedThreadPool(
071            r -> new Thread(r, THREAD_NAME + "-" + THREAD_NUMBER.incrementAndGet()));
072
073    protected Dispatcher dispatcher;
074
075    protected Thread thread;
076
077    @Override
078    public void initialize(Map<String, String> options, Map<String, List<BiConsumer<String, byte[]>>> subscribers) {
079        super.initialize(options, subscribers);
080        log.debug("Initializing");
081        namespace = Framework.getService(RedisAdmin.class).namespace();
082        dispatcher = new Dispatcher(namespace + "*");
083        thread = new Thread(dispatcher::run, THREAD_NAME);
084        thread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
085        thread.setPriority(Thread.NORM_PRIORITY);
086        thread.setDaemon(true);
087        thread.start();
088        if (!dispatcher.awaitSubscribed(TIMEOUT_SUBSCRIBE_SECONDS, TimeUnit.SECONDS)) {
089            thread.interrupt();
090            throw new NuxeoException(
091                    "Failed to subscribe to Redis pubsub after " + TIMEOUT_SUBSCRIBE_SECONDS + "s");
092        }
093        log.debug("Initialized");
094    }
095
096    @Override
097    public void close() {
098        log.debug("Closing");
099        if (dispatcher != null) {
100            thread.interrupt();
101            thread = null;
102            dispatcher.close();
103            dispatcher = null;
104        }
105        log.debug("Closed");
106    }
107
108    /**
109     * Subscribes to the provided Redis channel pattern and dispatches received messages. Method {@code #run} must be
110     * called in a new thread.
111     */
112    public class Dispatcher extends JedisPubSub {
113
114        // we look this up during construction in the main thread,
115        // because service lookup is unavailable from alternative threads during startup
116        protected RedisExecutor redisExecutor;
117
118        protected final String pattern;
119
120        protected final CountDownLatch subscribedLatch;
121
122        protected volatile boolean stop;
123
124        public Dispatcher(String pattern) {
125            redisExecutor = Framework.getService(RedisExecutor.class);
126            this.pattern = pattern;
127            this.subscribedLatch = new CountDownLatch(1);
128        }
129
130        /**
131         * To be called from the main thread to wait for subscription to be effective.
132         */
133        public boolean awaitSubscribed(long timeout, TimeUnit unit) {
134            try {
135                return subscribedLatch.await(timeout, unit);
136            } catch (InterruptedException e) {
137                Thread.currentThread().interrupt();
138                throw new NuxeoException(e);
139            }
140        }
141
142        /**
143         * To be called from a new thread to do the actual Redis subscription and to dispatch messages.
144         */
145        public void run() {
146            log.debug("Subscribing to: " + pattern);
147            // we can't do service lookup during startup here because we're in a separate thread
148            RedisExecutor redisExecutor = this.redisExecutor;
149            this.redisExecutor = null;
150            redisExecutor.psubscribe(this, pattern);
151        }
152
153        /**
154         * To be called from the main thread to stop the subscription.
155         */
156        public void close() {
157            stop = true;
158            // send an empty message so that the dispatcher thread can be woken up and stop
159            publish("", new byte[0]);
160        }
161
162        @Override
163        public void onPSubscribe(String pattern, int subscribedChannels) {
164            subscribedLatch.countDown();
165            if (log.isDebugEnabled()) {
166                log.debug("Subscribed to: " + pattern);
167            }
168        }
169
170        public void onMessage(String channel, byte[] message) {
171            if (message == null) {
172                message = new byte[0];
173            }
174            if (log.isTraceEnabled()) {
175                log.trace("Message received from channel: " + channel + " (" + message.length + " bytes)");
176            }
177            String topic = channel.substring(namespace.length());
178            // localPublish needs to be called in a different thread,
179            // so that if a subscriber calls Redis it doesn't reuse our current Redis connection
180            // which can only be used for subscribe/unsubscribe/ping commands.
181            final byte[] finalMessage = message;
182            THREAD_POOL.execute(() -> localPublish(topic, finalMessage));
183        }
184
185        public void onPMessage(String pattern, String channel, byte[] message) {
186            onMessage(channel, message);
187        }
188
189        @Override
190        public void proceed(Client client, String... channels) {
191            client.subscribe(channels);
192            flush(client);
193            processBinary(client);
194        }
195
196        @Override
197        public void proceedWithPatterns(Client client, String... patterns) {
198            client.psubscribe(patterns);
199            flush(client);
200            processBinary(client);
201        }
202
203        // stupid Jedis has a protected flush method
204        protected void flush(Client client) {
205            try {
206                MethodUtils.invokeMethod(client, true, "flush");
207            } catch (ReflectiveOperationException e) {
208                throw new NuxeoException(e);
209            }
210        }
211
212        // patched process() to pass the raw binary message to onMessage and onPMessage
213        protected void processBinary(Client client) {
214            for (;;) {
215                List<Object> reply = client.getRawObjectMultiBulkReply();
216                if (stop) {
217                    return;
218                }
219                Object type = reply.get(0);
220                if (!(type instanceof byte[])) {
221                    throw new JedisException("Unknown message type: " + type);
222                }
223                byte[] btype = (byte[]) type;
224                if (Arrays.equals(MESSAGE.raw, btype)) {
225                    byte[] bchannel = (byte[]) reply.get(1);
226                    byte[] bmesg = (byte[]) reply.get(2);
227                    onMessage(toString(bchannel), bmesg);
228                } else if (Arrays.equals(PMESSAGE.raw, btype)) {
229                    byte[] bpattern = (byte[]) reply.get(1);
230                    byte[] bchannel = (byte[]) reply.get(2);
231                    byte[] bmesg = (byte[]) reply.get(3);
232                    onPMessage(toString(bpattern), toString(bchannel), bmesg);
233                } else if (Arrays.equals(SUBSCRIBE.raw, btype)) {
234                    byte[] bchannel = (byte[]) reply.get(1);
235                    onSubscribe(toString(bchannel), 0);
236                } else if (Arrays.equals(PSUBSCRIBE.raw, btype)) {
237                    byte[] bpattern = (byte[]) reply.get(1);
238                    onPSubscribe(toString(bpattern), 0);
239                } else if (Arrays.equals(UNSUBSCRIBE.raw, btype)) {
240                    byte[] bchannel = (byte[]) reply.get(1);
241                    onUnsubscribe(toString(bchannel), 0);
242                } else if (Arrays.equals(PUNSUBSCRIBE.raw, btype)) {
243                    byte[] bpattern = (byte[]) reply.get(1);
244                    onPUnsubscribe(toString(bpattern), 0);
245                } else {
246                    throw new JedisException("Unknown message: " + toString(btype));
247                }
248            }
249        }
250
251        protected String toString(byte[] bytes) {
252            return bytes == null ? null : SafeEncoder.encode(bytes);
253        }
254
255    }
256
257    // ===== PubSubService =====
258
259    @Override
260    public void publish(String topic, byte[] message) {
261        String channel = namespace + topic;
262        byte[] bchannel = SafeEncoder.encode(channel);
263        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
264        if (redisExecutor != null) {
265            redisExecutor.execute(jedis -> jedis.publish(bchannel, message));
266        }
267    }
268
269}