001/* 002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.nuxeo.ecm.core.redis; 017 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021 022import redis.clients.jedis.Client; 023import redis.clients.jedis.Jedis; 024import redis.clients.jedis.JedisMonitor; 025import redis.clients.jedis.exceptions.JedisConnectionException; 026import redis.clients.jedis.exceptions.JedisException; 027import redis.clients.util.Pool; 028 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031 032public class RedisPoolExecutor extends RedisAbstractExecutor { 033 034 private static final Log log = LogFactory.getLog(RedisPoolExecutor.class); 035 private Thread monitorThread; 036 protected Pool<Jedis> pool; 037 038 protected final ThreadLocal<Jedis> holder = new ThreadLocal<>(); 039 040 public RedisPoolExecutor(Pool<Jedis> pool) { 041 this.pool = pool; 042 } 043 044 @Override 045 public <T> T execute(RedisCallable<T> callable) throws JedisException { 046 { // re-entrance 047 Jedis jedis = holder.get(); 048 if (jedis != null) { 049 return callable.call(jedis); 050 } 051 } 052 if (monitorThread != null) { 053 log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s", 054 pool.getNumActive(), pool.getNumIdle())); 055 } 056 Jedis jedis = pool.getResource(); 057 if (monitorThread != null) { 058 log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort()); 059 } 060 holder.set(jedis); 061 boolean brokenResource = false; 062 try { 063 return callable.call(jedis); 064 } catch (JedisConnectionException cause) { 065 brokenResource = true; 066 throw cause; 067 } finally { 068 holder.remove(); 069 // a disconnected resournce must be marked as broken 070 // this happens when the monitoring is stopped 071 if (brokenResource || !jedis.isConnected()) { 072 pool.returnBrokenResource(jedis); 073 } else { 074 pool.returnResource(jedis); 075 } 076 } 077 078 } 079 080 @Override 081 public Pool<Jedis> getPool() { 082 return pool; 083 } 084 085 @Override 086 public void startMonitor() { 087 CountDownLatch monitorLatch = new CountDownLatch(1); 088 monitorThread = new Thread(new Runnable() { 089 @Override 090 public void run() { 091 log.debug("Starting monitor thread"); 092 execute(jedis -> { 093 jedis.monitor(new JedisMonitor() { 094 @Override 095 public void proceed(Client client) { 096 monitorLatch.countDown(); 097 super.proceed(client); 098 } 099 100 @Override 101 public void onCommand(String command) { 102 if (Thread.currentThread().isInterrupted()) { 103 // The only way to get out of this thread 104 jedis.disconnect(); 105 } else { 106 log.debug(command); 107 } 108 } 109 }); 110 log.debug("Monitor thread stopped"); 111 return null; 112 }); 113 } 114 }); 115 monitorThread.setName("Nuxeo-Redis-Monitor"); 116 monitorThread.start(); 117 try { 118 if (! monitorLatch.await(5, TimeUnit.SECONDS)) { 119 log.error("Failed to init Redis moniotring"); 120 } 121 } catch (InterruptedException e) { 122 Thread.currentThread().interrupt(); 123 throw new RuntimeException(e); 124 } 125 } 126 127 @Override 128 public void stopMonitor() { 129 if (monitorThread != null) { 130 log.debug("Stoping monitor"); 131 monitorThread.interrupt(); 132 monitorThread = null; 133 } 134 } 135 136}