001/*
002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.nuxeo.ecm.core.redis;
017
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021
022import redis.clients.jedis.Client;
023import redis.clients.jedis.Jedis;
024import redis.clients.jedis.JedisMonitor;
025import redis.clients.jedis.exceptions.JedisConnectionException;
026import redis.clients.jedis.exceptions.JedisException;
027import redis.clients.util.Pool;
028
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031
032public class RedisPoolExecutor extends RedisAbstractExecutor {
033
034    private static final Log log = LogFactory.getLog(RedisPoolExecutor.class);
035    private Thread monitorThread;
036    protected Pool<Jedis> pool;
037
038    protected final ThreadLocal<Jedis> holder = new ThreadLocal<>();
039
040    public RedisPoolExecutor(Pool<Jedis> pool) {
041        this.pool = pool;
042    }
043
044    @Override
045    public <T> T execute(RedisCallable<T> callable) throws JedisException {
046        { // re-entrance
047            Jedis jedis = holder.get();
048            if (jedis != null) {
049                return callable.call(jedis);
050            }
051        }
052        if (monitorThread != null) {
053            log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s",
054                    pool.getNumActive(), pool.getNumIdle()));
055        }
056        Jedis jedis = pool.getResource();
057        if (monitorThread != null) {
058            log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort());
059        }
060        holder.set(jedis);
061        boolean brokenResource = false;
062        try {
063            return callable.call(jedis);
064        } catch (JedisConnectionException cause) {
065            brokenResource = true;
066            throw cause;
067        } finally {
068            holder.remove();
069            // a disconnected resournce must be marked as broken
070            // this happens when the monitoring is stopped
071            if (brokenResource || !jedis.isConnected()) {
072                pool.returnBrokenResource(jedis);
073            } else {
074                pool.returnResource(jedis);
075            }
076        }
077
078    }
079
080    @Override
081    public Pool<Jedis> getPool() {
082        return pool;
083    }
084
085    @Override
086    public void startMonitor() {
087        CountDownLatch monitorLatch = new CountDownLatch(1);
088        monitorThread = new Thread(new Runnable() {
089            @Override
090            public void run() {
091                log.debug("Starting monitor thread");
092                execute(jedis -> {
093                    jedis.monitor(new JedisMonitor() {
094                        @Override
095                        public void proceed(Client client) {
096                            monitorLatch.countDown();
097                            super.proceed(client);
098                        }
099
100                        @Override
101                        public void onCommand(String command) {
102                            if (Thread.currentThread().isInterrupted()) {
103                                // The only way to get out of this thread
104                                jedis.disconnect();
105                            } else {
106                                log.debug(command);
107                            }
108                        }
109                    });
110                    log.debug("Monitor thread stopped");
111                    return null;
112                });
113            }
114        });
115        monitorThread.setName("Nuxeo-Redis-Monitor");
116        monitorThread.start();
117        try {
118            if (! monitorLatch.await(5, TimeUnit.SECONDS)) {
119                log.error("Failed to init Redis moniotring");
120            }
121        } catch (InterruptedException e) {
122            Thread.currentThread().interrupt();
123            throw new RuntimeException(e);
124        }
125    }
126
127    @Override
128    public void stopMonitor() {
129        if (monitorThread != null) {
130            log.debug("Stoping monitor");
131            monitorThread.interrupt();
132            monitorThread = null;
133        }
134    }
135
136}