001/* 002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.nuxeo.ecm.core.redis; 017 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021 022import redis.clients.jedis.Client; 023import redis.clients.jedis.Jedis; 024import redis.clients.jedis.JedisMonitor; 025import redis.clients.jedis.exceptions.JedisConnectionException; 026import redis.clients.jedis.exceptions.JedisException; 027import redis.clients.util.Pool; 028 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031 032public class RedisPoolExecutor extends RedisAbstractExecutor { 033 034 private static final Log log = LogFactory.getLog(RedisPoolExecutor.class); 035 private Thread monitorThread; 036 protected Pool<Jedis> pool; 037 038 protected final ThreadLocal<Jedis> holder = new ThreadLocal<>(); 039 040 public RedisPoolExecutor(Pool<Jedis> pool) { 041 this.pool = pool; 042 } 043 044 @SuppressWarnings("resource") // Jedis resource from Pool<Jedis> must not be closed, just returned 045 @Override 046 public <T> T execute(RedisCallable<T> callable) throws JedisException { 047 { // re-entrance 048 Jedis jedis = holder.get(); 049 if (jedis != null) { 050 return callable.call(jedis); 051 } 052 } 053 if (monitorThread != null) { 054 log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s", 055 pool.getNumActive(), pool.getNumIdle())); 056 } 057 Jedis jedis = pool.getResource(); 058 if (monitorThread != null) { 059 log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort()); 060 } 061 holder.set(jedis); 062 boolean brokenResource = false; 063 try { 064 return callable.call(jedis); 065 } catch (JedisConnectionException cause) { 066 brokenResource = true; 067 throw cause; 068 } finally { 069 holder.remove(); 070 // a disconnected resournce must be marked as broken 071 // this happens when the monitoring is stopped 072 if (brokenResource || !jedis.isConnected()) { 073 pool.returnBrokenResource(jedis); 074 } else { 075 pool.returnResource(jedis); 076 } 077 } 078 079 } 080 081 @Override 082 public Pool<Jedis> getPool() { 083 return pool; 084 } 085 086 @Override 087 public void startMonitor() { 088 CountDownLatch monitorLatch = new CountDownLatch(1); 089 monitorThread = new Thread(new Runnable() { 090 @Override 091 public void run() { 092 log.debug("Starting monitor thread"); 093 execute(jedis -> { 094 jedis.monitor(new JedisMonitor() { 095 @Override 096 public void proceed(Client client) { 097 monitorLatch.countDown(); 098 super.proceed(client); 099 } 100 101 @Override 102 public void onCommand(String command) { 103 if (Thread.currentThread().isInterrupted()) { 104 // The only way to get out of this thread 105 jedis.disconnect(); 106 } else { 107 log.debug(command); 108 } 109 } 110 }); 111 log.debug("Monitor thread stopped"); 112 return null; 113 }); 114 } 115 }); 116 monitorThread.setName("Nuxeo-Redis-Monitor"); 117 monitorThread.start(); 118 try { 119 if (! monitorLatch.await(5, TimeUnit.SECONDS)) { 120 log.error("Failed to init Redis moniotring"); 121 } 122 } catch (InterruptedException e) { 123 Thread.currentThread().interrupt(); 124 throw new RuntimeException(e); 125 } 126 } 127 128 @Override 129 public void stopMonitor() { 130 if (monitorThread != null) { 131 log.debug("Stoping monitor"); 132 monitorThread.interrupt(); 133 monitorThread = null; 134 } 135 } 136 137}