001/*
002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.nuxeo.ecm.core.redis;
017
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021import redis.clients.jedis.Client;
022import redis.clients.jedis.Jedis;
023import redis.clients.jedis.JedisMonitor;
024import redis.clients.jedis.exceptions.JedisConnectionException;
025import redis.clients.jedis.exceptions.JedisException;
026import redis.clients.util.Pool;
027
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.TimeUnit;
030
031public class RedisPoolExecutor implements RedisExecutor {
032
033    private static final Log log = LogFactory.getLog(RedisPoolExecutor.class);
034    private Thread monitorThread;
035    protected Pool<Jedis> pool;
036
037    public RedisPoolExecutor(Pool<Jedis> pool) {
038        this.pool = pool;
039    }
040
041    @Override
042    public <T> T execute(RedisCallable<T> callable) throws JedisException {
043        if (monitorThread != null) {
044            log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s",
045                    pool.getNumActive(), pool.getNumIdle()));
046        }
047        Jedis jedis = pool.getResource();
048        if (monitorThread != null) {
049            log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort());
050        }
051        boolean brokenResource = false;
052        try {
053            return callable.call(jedis);
054        } catch (JedisConnectionException cause) {
055            brokenResource = true;
056            throw cause;
057        } finally {
058            // a disconnected resournce must be marked as broken
059            // this happens when the monitoring is stopped
060            if (brokenResource || !jedis.isConnected()) {
061                pool.returnBrokenResource(jedis);
062            } else {
063                pool.returnResource(jedis);
064            }
065        }
066
067    }
068
069    @Override
070    public Pool<Jedis> getPool() {
071        return pool;
072    }
073
074    @Override
075    public void startMonitor() {
076        CountDownLatch monitorLatch = new CountDownLatch(1);
077        monitorThread = new Thread(new Runnable() {
078            @Override
079            public void run() {
080                log.debug("Starting monitor thread");
081                execute(jedis -> {
082                    jedis.monitor(new JedisMonitor() {
083                        @Override
084                        public void proceed(Client client) {
085                            monitorLatch.countDown();
086                            super.proceed(client);
087                        }
088
089                        public void onCommand(String command) {
090                            if (Thread.currentThread().isInterrupted()) {
091                                // The only way to get out of this thread
092                                jedis.disconnect();
093                            } else {
094                                log.debug(command);
095                            }
096                        }
097                    });
098                    log.debug("Monitor thread stopped");
099                    return null;
100                });
101            }
102        });
103        monitorThread.setName("Nuxeo-Redis-Monitor");
104        monitorThread.start();
105        try {
106            if (! monitorLatch.await(5, TimeUnit.SECONDS)) {
107                log.error("Failed to init Redis moniotring");
108            }
109        } catch (InterruptedException e) {
110            Thread.currentThread().interrupt();
111            throw new RuntimeException(e);
112        }
113    }
114
115    @Override
116    public void stopMonitor() {
117        if (monitorThread != null) {
118            log.debug("Stoping monitor");
119            monitorThread.interrupt();
120            monitorThread = null;
121        }
122    }
123
124}