001/* 002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.nuxeo.ecm.core.redis; 017 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021import redis.clients.jedis.Jedis; 022import redis.clients.jedis.JedisMonitor; 023import redis.clients.jedis.exceptions.JedisConnectionException; 024import redis.clients.jedis.exceptions.JedisException; 025import redis.clients.util.Pool; 026 027public class RedisPoolExecutor implements RedisExecutor { 028 029 private static final Log log = LogFactory.getLog(RedisPoolExecutor.class); 030 private Thread monitorThread; 031 protected Pool<Jedis> pool; 032 033 public RedisPoolExecutor(Pool<Jedis> pool) { 034 this.pool = pool; 035 } 036 037 @Override 038 public <T> T execute(RedisCallable<T> callable) throws JedisException { 039 if (monitorThread != null) { 040 log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s", 041 pool.getNumActive(), pool.getNumIdle())); 042 } 043 Jedis jedis = pool.getResource(); 044 if (monitorThread != null) { 045 log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort()); 046 } 047 boolean brokenResource = false; 048 try { 049 return callable.call(jedis); 050 } catch (JedisConnectionException cause) { 051 brokenResource = true; 052 throw cause; 053 } finally { 054 // a disconnected resournce must be marked as broken 055 // this happens when the monitoring is stopped 056 if (brokenResource || !jedis.isConnected()) { 057 pool.returnBrokenResource(jedis); 058 } else { 059 pool.returnResource(jedis); 060 } 061 } 062 063 } 064 065 @Override 066 public Pool<Jedis> getPool() { 067 return pool; 068 } 069 070 @Override 071 public boolean supportPipelined() { 072 return true; 073 } 074 075 @Override 076 public void startMonitor() { 077 monitorThread = new Thread(new Runnable() { 078 @Override 079 public void run() { 080 log.debug("Starting monitor thread"); 081 execute(jedis -> { 082 jedis.monitor(new JedisMonitor() { 083 public void onCommand(String command) { 084 if (Thread.currentThread().isInterrupted()) { 085 // The only way to get out of this thread 086 jedis.disconnect(); 087 } else { 088 log.debug(command); 089 } 090 } 091 }); 092 log.debug("Monitor thread stopped"); 093 return null; 094 }); 095 } 096 }); 097 monitorThread.setName("Nuxeo-Redis-Monitor"); 098 monitorThread.start(); 099 } 100 101 @Override 102 public void stopMonitor() { 103 if (monitorThread != null) { 104 log.debug("Stoping monitor"); 105 monitorThread.interrupt(); 106 monitorThread = null; 107 } 108 } 109 110}