001/*
002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.nuxeo.ecm.core.redis;
017
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021
022import redis.clients.jedis.Client;
023import redis.clients.jedis.Jedis;
024import redis.clients.jedis.JedisMonitor;
025import redis.clients.jedis.exceptions.JedisConnectionException;
026import redis.clients.jedis.exceptions.JedisException;
027import redis.clients.util.Pool;
028
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031
032public class RedisPoolExecutor extends RedisAbstractExecutor {
033
034    private static final Log log = LogFactory.getLog(RedisPoolExecutor.class);
035    private Thread monitorThread;
036    protected Pool<Jedis> pool;
037
038    protected final ThreadLocal<Jedis> holder = new ThreadLocal<>();
039
040    public RedisPoolExecutor(Pool<Jedis> pool) {
041        this.pool = pool;
042    }
043
044    @SuppressWarnings("resource") // Jedis resource from Pool<Jedis> must not be closed, just returned
045    @Override
046    public <T> T execute(RedisCallable<T> callable) throws JedisException {
047        { // re-entrance
048            Jedis jedis = holder.get();
049            if (jedis != null) {
050                return callable.call(jedis);
051            }
052        }
053        if (monitorThread != null) {
054            log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s",
055                    pool.getNumActive(), pool.getNumIdle()));
056        }
057        Jedis jedis = pool.getResource();
058        if (monitorThread != null) {
059            log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort());
060        }
061        holder.set(jedis);
062        boolean brokenResource = false;
063        try {
064            return callable.call(jedis);
065        } catch (JedisConnectionException cause) {
066            brokenResource = true;
067            throw cause;
068        } finally {
069            holder.remove();
070            // a disconnected resournce must be marked as broken
071            // this happens when the monitoring is stopped
072            if (brokenResource || !jedis.isConnected()) {
073                pool.returnBrokenResource(jedis);
074            } else {
075                pool.returnResource(jedis);
076            }
077        }
078
079    }
080
081    @Override
082    public Pool<Jedis> getPool() {
083        return pool;
084    }
085
086    @Override
087    public void startMonitor() {
088        CountDownLatch monitorLatch = new CountDownLatch(1);
089        monitorThread = new Thread(new Runnable() {
090            @Override
091            public void run() {
092                log.debug("Starting monitor thread");
093                execute(jedis -> {
094                    jedis.monitor(new JedisMonitor() {
095                        @Override
096                        public void proceed(Client client) {
097                            monitorLatch.countDown();
098                            super.proceed(client);
099                        }
100
101                        @Override
102                        public void onCommand(String command) {
103                            if (Thread.currentThread().isInterrupted()) {
104                                // The only way to get out of this thread
105                                jedis.disconnect();
106                            } else {
107                                log.debug(command);
108                            }
109                        }
110                    });
111                    log.debug("Monitor thread stopped");
112                    return null;
113                });
114            }
115        });
116        monitorThread.setName("Nuxeo-Redis-Monitor");
117        monitorThread.start();
118        try {
119            if (! monitorLatch.await(5, TimeUnit.SECONDS)) {
120                log.error("Failed to init Redis moniotring");
121            }
122        } catch (InterruptedException e) {
123            Thread.currentThread().interrupt();
124            throw new RuntimeException(e);
125        }
126    }
127
128    @Override
129    public void stopMonitor() {
130        if (monitorThread != null) {
131            log.debug("Stoping monitor");
132            monitorThread.interrupt();
133            monitorThread = null;
134        }
135    }
136
137}