001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.core.redis.contribs; 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 022import org.nuxeo.ecm.core.work.WorkHolder; 023import org.nuxeo.ecm.core.work.api.Work; 024 025import java.io.IOException; 026import java.net.ConnectException; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicLong; 030import java.util.concurrent.locks.Condition; 031import java.util.concurrent.locks.Lock; 032import java.util.concurrent.locks.ReentrantLock; 033 034import redis.clients.jedis.exceptions.JedisConnectionException; 035 036/** 037 * Redis-based {@link BlockingQueue}. 038 * <p> 039 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}. 040 * 041 * @since 5.8 042 */ 043public class RedisBlockingQueue extends NuxeoBlockingQueue { 044 045 private static final Log log = LogFactory.getLog(RedisBlockingQueue.class); 046 047 // this is so that we don't spam the logs with too many errors 048 private static final long LOG_INTERVAL = 1000 * 10; // 10s 049 050 private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0); 051 052 private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0); 053 054 protected final String queueId; 055 056 protected final RedisWorkQueuing queuing; 057 058 protected final Lock lock = new ReentrantLock(); 059 protected final Condition notEmpty = lock.newCondition(); 060 061 public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) { 062 this.queueId = queueId; 063 this.queuing = queuing; 064 } 065 066 @Override 067 public int getQueueSize() { 068 return queuing.getScheduledSize(queueId); 069 } 070 071 @Override 072 public Runnable take() throws InterruptedException { 073 for (; ; ) { 074 Runnable r = poll(1, TimeUnit.DAYS); 075 if (r != null) { 076 return r; 077 } 078 } 079 } 080 081 @Override 082 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 083 long nanos = unit.toNanos(timeout); 084 nanos = awaitActivation(nanos); 085 if (nanos <= 0) { 086 return null; 087 } 088 long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos); 089 for (; ; ) { 090 Runnable r = poll(); 091 if (r != null) { 092 return r; 093 } 094 if (timeUntil(end) == 0) { 095 return null; 096 } 097 lock.lock(); 098 try { 099 notEmpty.await(1, TimeUnit.SECONDS); 100 } finally { 101 lock.unlock(); 102 } 103 104 } 105 } 106 107 @Override 108 public void putElement(Runnable r) { 109 Work work = WorkHolder.getWork(r); 110 lock.lock(); 111 try { 112 queuing.addScheduledWork(queueId, work); 113 notEmpty.signal(); 114 } catch (IOException e) { 115 log.error("Failed to add Work: " + work, e); 116 throw new RuntimeException(e); 117 } finally { 118 lock.unlock(); 119 } 120 } 121 122 @Override 123 public Runnable pollElement() { 124 try { 125 Work work = queuing.removeScheduledWork(queueId); 126 if (work != null) { 127 log.debug("Remove scheduled " + work); 128 } 129 return work == null ? null : new WorkHolder(work); 130 } catch (IOException e) { 131 if (delayExpired(LAST_IO_EXCEPTION)) { 132 // log full stacktrace 133 log.error(e.getMessage(), e); 134 } 135 // for io errors make poll return no result 136 return null; 137 } catch (JedisConnectionException e) { 138 if (delayExpired(LAST_CONNECTION_EXCEPTION)) { 139 Throwable cause = e.getCause(); 140 if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) { 141 log.error(e.getMessage() + ": " + cause.getMessage()); 142 log.debug(e.getMessage(), e); 143 } else { 144 // log full stacktrace 145 log.error(e.getMessage(), e); 146 } 147 } 148 // for connection errors make poll return no result 149 return null; 150 } 151 } 152 153 protected static boolean delayExpired(AtomicLong atomic) { 154 long now = System.currentTimeMillis(); 155 long last = atomic.get(); 156 if (now > last + LOG_INTERVAL) { 157 if (atomic.compareAndSet(last, now)) { 158 return true; 159 } // else some other thread beat us to it 160 } 161 return false; 162 } 163 164}