001/* 002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.nuxeo.ecm.core.redis; 017 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021import redis.clients.jedis.Client; 022import redis.clients.jedis.Jedis; 023import redis.clients.jedis.JedisMonitor; 024import redis.clients.jedis.exceptions.JedisConnectionException; 025import redis.clients.jedis.exceptions.JedisException; 026import redis.clients.util.Pool; 027 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.TimeUnit; 030 031public class RedisPoolExecutor implements RedisExecutor { 032 033 private static final Log log = LogFactory.getLog(RedisPoolExecutor.class); 034 private Thread monitorThread; 035 protected Pool<Jedis> pool; 036 037 protected final ThreadLocal<Jedis> holder = new ThreadLocal<>(); 038 039 public RedisPoolExecutor(Pool<Jedis> pool) { 040 this.pool = pool; 041 } 042 043 @Override 044 public <T> T execute(RedisCallable<T> callable) throws JedisException { 045 { // re-entrance 046 Jedis jedis = holder.get(); 047 if (jedis != null) { 048 return callable.call(jedis); 049 } 050 } 051 if (monitorThread != null) { 052 log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s", 053 pool.getNumActive(), pool.getNumIdle())); 054 } 055 Jedis jedis = pool.getResource(); 056 if (monitorThread != null) { 057 log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort()); 058 } 059 holder.set(jedis); 060 boolean brokenResource = false; 061 try { 062 return callable.call(jedis); 063 } catch (JedisConnectionException cause) { 064 brokenResource = true; 065 throw cause; 066 } finally { 067 holder.remove(); 068 // a disconnected resournce must be marked as broken 069 // this happens when the monitoring is stopped 070 if (brokenResource || !jedis.isConnected()) { 071 pool.returnBrokenResource(jedis); 072 } else { 073 pool.returnResource(jedis); 074 } 075 } 076 077 } 078 079 @Override 080 public Pool<Jedis> getPool() { 081 return pool; 082 } 083 084 @Override 085 public void startMonitor() { 086 CountDownLatch monitorLatch = new CountDownLatch(1); 087 monitorThread = new Thread(new Runnable() { 088 @Override 089 public void run() { 090 log.debug("Starting monitor thread"); 091 execute(jedis -> { 092 jedis.monitor(new JedisMonitor() { 093 @Override 094 public void proceed(Client client) { 095 monitorLatch.countDown(); 096 super.proceed(client); 097 } 098 099 @Override 100 public void onCommand(String command) { 101 if (Thread.currentThread().isInterrupted()) { 102 // The only way to get out of this thread 103 jedis.disconnect(); 104 } else { 105 log.debug(command); 106 } 107 } 108 }); 109 log.debug("Monitor thread stopped"); 110 return null; 111 }); 112 } 113 }); 114 monitorThread.setName("Nuxeo-Redis-Monitor"); 115 monitorThread.start(); 116 try { 117 if (! monitorLatch.await(5, TimeUnit.SECONDS)) { 118 log.error("Failed to init Redis moniotring"); 119 } 120 } catch (InterruptedException e) { 121 Thread.currentThread().interrupt(); 122 throw new RuntimeException(e); 123 } 124 } 125 126 @Override 127 public void stopMonitor() { 128 if (monitorThread != null) { 129 log.debug("Stoping monitor"); 130 monitorThread.interrupt(); 131 monitorThread = null; 132 } 133 } 134 135}