001/*
002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.nuxeo.ecm.core.redis;
017
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021import redis.clients.jedis.Jedis;
022import redis.clients.jedis.JedisMonitor;
023import redis.clients.jedis.exceptions.JedisConnectionException;
024import redis.clients.jedis.exceptions.JedisException;
025import redis.clients.util.Pool;
026
027public class RedisPoolExecutor implements RedisExecutor {
028
029    private static final Log log = LogFactory.getLog(RedisPoolExecutor.class);
030    private Thread monitorThread;
031    protected Pool<Jedis> pool;
032
033    public RedisPoolExecutor(Pool<Jedis> pool) {
034        this.pool = pool;
035    }
036
037    @Override
038    public <T> T execute(RedisCallable<T> callable) throws JedisException {
039        if (monitorThread != null) {
040            log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s",
041                    pool.getNumActive(), pool.getNumIdle()));
042        }
043        Jedis jedis = pool.getResource();
044        if (monitorThread != null) {
045            log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort());
046        }
047        boolean brokenResource = false;
048        try {
049            return callable.call(jedis);
050        } catch (JedisConnectionException cause) {
051            brokenResource = true;
052            throw cause;
053        } finally {
054            // a disconnected resournce must be marked as broken
055            // this happens when the monitoring is stopped
056            if (brokenResource || !jedis.isConnected()) {
057                pool.returnBrokenResource(jedis);
058            } else {
059                pool.returnResource(jedis);
060            }
061        }
062
063    }
064
065    @Override
066    public Pool<Jedis> getPool() {
067        return pool;
068    }
069
070    @Override
071    public boolean supportPipelined() {
072        return true;
073    }
074
075    @Override
076    public void startMonitor() {
077        monitorThread = new Thread(new Runnable() {
078            @Override
079            public void run() {
080                log.debug("Starting monitor thread");
081                execute(jedis -> {
082                    jedis.monitor(new JedisMonitor() {
083                        public void onCommand(String command) {
084                            if (Thread.currentThread().isInterrupted()) {
085                                // The only way to get out of this thread
086                                jedis.disconnect();
087                            } else {
088                                log.debug(command);
089                            }
090                        }
091                    });
092                    log.debug("Monitor thread stopped");
093                    return null;
094                });
095            }
096        });
097        monitorThread.setName("Nuxeo-Redis-Monitor");
098        monitorThread.start();
099    }
100
101    @Override
102    public void stopMonitor() {
103        if (monitorThread != null) {
104            log.debug("Stoping monitor");
105            monitorThread.interrupt();
106            monitorThread = null;
107        }
108    }
109
110}