001/* 002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.nuxeo.ecm.core.redis; 017 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021import redis.clients.jedis.Client; 022import redis.clients.jedis.Jedis; 023import redis.clients.jedis.JedisMonitor; 024import redis.clients.jedis.exceptions.JedisConnectionException; 025import redis.clients.jedis.exceptions.JedisException; 026import redis.clients.util.Pool; 027 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.TimeUnit; 030 031public class RedisPoolExecutor implements RedisExecutor { 032 033 private static final Log log = LogFactory.getLog(RedisPoolExecutor.class); 034 private Thread monitorThread; 035 protected Pool<Jedis> pool; 036 037 public RedisPoolExecutor(Pool<Jedis> pool) { 038 this.pool = pool; 039 } 040 041 @Override 042 public <T> T execute(RedisCallable<T> callable) throws JedisException { 043 if (monitorThread != null) { 044 log.debug(String.format("Redis pool state before getting a conn: active: %d, idle: %s", 045 pool.getNumActive(), pool.getNumIdle())); 046 } 047 Jedis jedis = pool.getResource(); 048 if (monitorThread != null) { 049 log.debug("Using conn: " + jedis.getClient().getSocket().getLocalPort()); 050 } 051 boolean brokenResource = false; 052 try { 053 return callable.call(jedis); 054 } catch (JedisConnectionException cause) { 055 brokenResource = true; 056 throw cause; 057 } finally { 058 // a disconnected resournce must be marked as broken 059 // this happens when the monitoring is stopped 060 if (brokenResource || !jedis.isConnected()) { 061 pool.returnBrokenResource(jedis); 062 } else { 063 pool.returnResource(jedis); 064 } 065 } 066 067 } 068 069 @Override 070 public Pool<Jedis> getPool() { 071 return pool; 072 } 073 074 @Override 075 public void startMonitor() { 076 CountDownLatch monitorLatch = new CountDownLatch(1); 077 monitorThread = new Thread(new Runnable() { 078 @Override 079 public void run() { 080 log.debug("Starting monitor thread"); 081 execute(jedis -> { 082 jedis.monitor(new JedisMonitor() { 083 @Override 084 public void proceed(Client client) { 085 monitorLatch.countDown(); 086 super.proceed(client); 087 } 088 089 public void onCommand(String command) { 090 if (Thread.currentThread().isInterrupted()) { 091 // The only way to get out of this thread 092 jedis.disconnect(); 093 } else { 094 log.debug(command); 095 } 096 } 097 }); 098 log.debug("Monitor thread stopped"); 099 return null; 100 }); 101 } 102 }); 103 monitorThread.setName("Nuxeo-Redis-Monitor"); 104 monitorThread.start(); 105 try { 106 if (! monitorLatch.await(5, TimeUnit.SECONDS)) { 107 log.error("Failed to init Redis moniotring"); 108 } 109 } catch (InterruptedException e) { 110 Thread.currentThread().interrupt(); 111 throw new RuntimeException(e); 112 } 113 } 114 115 @Override 116 public void stopMonitor() { 117 if (monitorThread != null) { 118 log.debug("Stoping monitor"); 119 monitorThread.interrupt(); 120 monitorThread = null; 121 } 122 } 123 124}