001/*
002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.nuxeo.ecm.core.redis;
017
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021import redis.clients.jedis.Client;
022import redis.clients.jedis.Jedis;
023import redis.clients.jedis.JedisMonitor;
024import redis.clients.jedis.exceptions.JedisConnectionException;
025import redis.clients.jedis.exceptions.JedisException;
026import redis.clients.util.Pool;
027
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.TimeUnit;
030
031public class RedisPoolExecutor implements RedisExecutor {
032
033    private static final Log log = LogFactory.getLog(RedisPoolExecutor.class);
034    private Thread monitorThread;
035    protected Pool<Jedis> pool;
036
037    protected final ThreadLocal<Jedis> holder = new ThreadLocal<>();
038
039    public RedisPoolExecutor(Pool<Jedis> pool) {
040        this.pool = pool;
041    }
042
043    @Override
044    public <T> T execute(RedisCallable<T> callable) throws JedisException {
045        { // re-entrance
046            Jedis jedis = holder.get();
047            if (jedis != null) {
048                return callable.call(jedis);
049            }
050        }
051        if (monitorThread != null) {
052            log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s",
053                    pool.getNumActive(), pool.getNumIdle()));
054        }
055        Jedis jedis = pool.getResource();
056        if (monitorThread != null) {
057            log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort());
058        }
059        holder.set(jedis);
060        boolean brokenResource = false;
061        try {
062            return callable.call(jedis);
063        } catch (JedisConnectionException cause) {
064            brokenResource = true;
065            throw cause;
066        } finally {
067            holder.remove();
068            // a disconnected resournce must be marked as broken
069            // this happens when the monitoring is stopped
070            if (brokenResource || !jedis.isConnected()) {
071                pool.returnBrokenResource(jedis);
072            } else {
073                pool.returnResource(jedis);
074            }
075        }
076
077    }
078
079    @Override
080    public Pool<Jedis> getPool() {
081        return pool;
082    }
083
084    @Override
085    public void startMonitor() {
086        CountDownLatch monitorLatch = new CountDownLatch(1);
087        monitorThread = new Thread(new Runnable() {
088            @Override
089            public void run() {
090                log.debug("Starting monitor thread");
091                execute(jedis -> {
092                    jedis.monitor(new JedisMonitor() {
093                        @Override
094                        public void proceed(Client client) {
095                            monitorLatch.countDown();
096                            super.proceed(client);
097                        }
098
099                        @Override
100                        public void onCommand(String command) {
101                            if (Thread.currentThread().isInterrupted()) {
102                                // The only way to get out of this thread
103                                jedis.disconnect();
104                            } else {
105                                log.debug(command);
106                            }
107                        }
108                    });
109                    log.debug("Monitor thread stopped");
110                    return null;
111                });
112            }
113        });
114        monitorThread.setName("Nuxeo-Redis-Monitor");
115        monitorThread.start();
116        try {
117            if (! monitorLatch.await(5, TimeUnit.SECONDS)) {
118                log.error("Failed to init Redis moniotring");
119            }
120        } catch (InterruptedException e) {
121            Thread.currentThread().interrupt();
122            throw new RuntimeException(e);
123        }
124    }
125
126    @Override
127    public void stopMonitor() {
128        if (monitorThread != null) {
129            log.debug("Stoping monitor");
130            monitorThread.interrupt();
131            monitorThread = null;
132        }
133    }
134
135}