001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 024import org.nuxeo.ecm.core.work.WorkHolder; 025import org.nuxeo.ecm.core.work.api.Work; 026 027import java.io.IOException; 028import java.net.ConnectException; 029import java.util.concurrent.BlockingQueue; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.locks.Condition; 034import java.util.concurrent.locks.Lock; 035import java.util.concurrent.locks.ReentrantLock; 036 037import redis.clients.jedis.exceptions.JedisConnectionException; 038 039/** 040 * Redis-based {@link BlockingQueue}. 041 * <p> 042 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}. 043 * 044 * @since 5.8 045 */ 046public class RedisBlockingQueue extends NuxeoBlockingQueue { 047 048 private static final Log log = LogFactory.getLog(RedisBlockingQueue.class); 049 050 // this is so that we don't spam the logs with too many errors 051 private static final long LOG_INTERVAL = 1000 * 10; // 10s 052 053 private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0); 054 055 private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0); 056 057 private static final int REMOTE_POLL_INTERVAL_MS = 1000; 058 059 private static final int REMOTE_POLL_INTERVAL_STDEV_MS = 200; 060 061 protected final String queueId; 062 063 protected final RedisWorkQueuing queuing; 064 065 protected final Lock lock = new ReentrantLock(); 066 protected final Condition notEmpty = lock.newCondition(); 067 068 public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) { 069 this.queueId = queueId; 070 this.queuing = queuing; 071 } 072 073 @Override 074 public int getQueueSize() { 075 return queuing.getScheduledSize(queueId); 076 } 077 078 @Override 079 public Runnable take() throws InterruptedException { 080 for (; ; ) { 081 Runnable r = poll(1, TimeUnit.DAYS); 082 if (r != null) { 083 return r; 084 } 085 } 086 } 087 088 @Override 089 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 090 long nanos = unit.toNanos(timeout); 091 nanos = awaitActivation(nanos); 092 if (nanos <= 0) { 093 return null; 094 } 095 long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos); 096 for (; ; ) { 097 Runnable r = poll(); 098 if (r != null) { 099 return r; 100 } 101 if (timeUntil(end) == 0) { 102 return null; 103 } 104 lock.lock(); 105 try { 106 // wake up if our instance has submitted a new job or wait 107 notEmpty.await(getRemotePollInterval(), TimeUnit.MILLISECONDS); 108 } finally { 109 lock.unlock(); 110 } 111 112 } 113 } 114 115 private int getRemotePollInterval() { 116 // add some randomness so we don't generate periodic spike when all workers are starving 117 return REMOTE_POLL_INTERVAL_MS + ThreadLocalRandom.current().nextInt(-1 * REMOTE_POLL_INTERVAL_STDEV_MS, 118 REMOTE_POLL_INTERVAL_STDEV_MS); 119 } 120 121 @Override 122 public void putElement(Runnable r) { 123 Work work = WorkHolder.getWork(r); 124 lock.lock(); 125 try { 126 queuing.addScheduledWork(queueId, work); 127 notEmpty.signal(); 128 } catch (IOException e) { 129 log.error("Failed to add Work: " + work, e); 130 throw new RuntimeException(e); 131 } finally { 132 lock.unlock(); 133 } 134 } 135 136 @Override 137 public Runnable pollElement() { 138 try { 139 Work work = queuing.getWorkFromQueue(queueId); 140 return work == null ? null : new WorkHolder(work); 141 } catch (IOException e) { 142 if (delayExpired(LAST_IO_EXCEPTION)) { 143 // log full stacktrace 144 log.error(e.getMessage(), e); 145 } 146 // for io errors make poll return no result 147 return null; 148 } catch (JedisConnectionException e) { 149 if (delayExpired(LAST_CONNECTION_EXCEPTION)) { 150 Throwable cause = e.getCause(); 151 if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) { 152 log.error(e.getMessage() + ": " + cause.getMessage()); 153 log.debug(e.getMessage(), e); 154 } else { 155 // log full stacktrace 156 log.error(e.getMessage(), e); 157 } 158 } 159 // for connection errors make poll return no result 160 return null; 161 } 162 } 163 164 protected static boolean delayExpired(AtomicLong atomic) { 165 long now = System.currentTimeMillis(); 166 long last = atomic.get(); 167 if (now > last + LOG_INTERVAL) { 168 if (atomic.compareAndSet(last, now)) { 169 return true; 170 } // else some other thread beat us to it 171 } 172 return false; 173 } 174 175}