001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.IOException; 022import java.net.ConnectException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ThreadLocalRandom; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicLong; 027import java.util.concurrent.locks.Condition; 028import java.util.concurrent.locks.Lock; 029import java.util.concurrent.locks.ReentrantLock; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 034import org.nuxeo.ecm.core.work.WorkHolder; 035import org.nuxeo.ecm.core.work.api.Work; 036import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 037 038import redis.clients.jedis.exceptions.JedisConnectionException; 039 040/** 041 * Redis-based {@link BlockingQueue}. 042 * <p> 043 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}. 044 * 045 * @since 5.8 046 */ 047public class RedisBlockingQueue extends NuxeoBlockingQueue { 048 049 private static final Log log = LogFactory.getLog(RedisBlockingQueue.class); 050 051 // this is so that we don't spam the logs with too many errors 052 private static final long LOG_INTERVAL = 1000 * 10; // 10s 053 054 private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0); 055 056 private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0); 057 058 private static final int REMOTE_POLL_INTERVAL_MS = 1000; 059 060 private static final int REMOTE_POLL_INTERVAL_STDEV_MS = 200; 061 062 protected final RedisWorkQueuing queuing; 063 064 protected final Lock lock = new ReentrantLock(); 065 protected final Condition notEmpty = lock.newCondition(); 066 067 public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) { 068 super(queueId, queuing); 069 this.queuing = queuing; 070 } 071 072 @Override 073 protected WorkQueueMetrics metrics() { 074 return queuing.metrics(queueId); 075 } 076 077 @Override 078 public int getQueueSize() { 079 return queuing.metrics(queueId).scheduled.intValue(); 080 } 081 082 @Override 083 public Runnable take() throws InterruptedException { 084 for (; ; ) { 085 Runnable r = poll(1, TimeUnit.DAYS); 086 if (r != null) { 087 return r; 088 } 089 } 090 } 091 092 @Override 093 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 094 long nanos = unit.toNanos(timeout); 095 nanos = awaitActivation(nanos); 096 if (nanos <= 0) { 097 return null; 098 } 099 long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos); 100 for (; ; ) { 101 Runnable r = poll(); 102 if (r != null) { 103 return r; 104 } 105 if (timeUntil(end) == 0) { 106 return null; 107 } 108 lock.lock(); 109 try { 110 // wake up if our instance has submitted a new job or wait 111 notEmpty.await(getRemotePollInterval(), TimeUnit.MILLISECONDS); 112 } finally { 113 lock.unlock(); 114 } 115 116 } 117 } 118 119 private int getRemotePollInterval() { 120 // add some randomness so we don't generate periodic spike when all workers are starving 121 return REMOTE_POLL_INTERVAL_MS + ThreadLocalRandom.current().nextInt(-1 * REMOTE_POLL_INTERVAL_STDEV_MS, 122 REMOTE_POLL_INTERVAL_STDEV_MS); 123 } 124 125 @Override 126 public void putElement(Runnable r) { 127 Work work = WorkHolder.getWork(r); 128 lock.lock(); 129 try { 130 queuing.workSetScheduled(queueId, work); 131 notEmpty.signal(); 132 } catch (IOException e) { 133 log.error("Failed to add Work: " + work, e); 134 throw new RuntimeException(e); 135 } finally { 136 lock.unlock(); 137 } 138 } 139 140 @Override 141 public Runnable pollElement() { 142 try { 143 Work work = queuing.getWorkFromQueue(queueId); 144 return work == null ? null : new WorkHolder(work); 145 } catch (IOException e) { 146 if (delayExpired(LAST_IO_EXCEPTION)) { 147 // log full stacktrace 148 log.error(e.getMessage(), e); 149 } 150 // for io errors make poll return no result 151 return null; 152 } catch (JedisConnectionException e) { 153 if (delayExpired(LAST_CONNECTION_EXCEPTION)) { 154 Throwable cause = e.getCause(); 155 if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) { 156 log.error(e.getMessage() + ": " + cause.getMessage()); 157 log.debug(e.getMessage(), e); 158 } else { 159 // log full stacktrace 160 log.error(e.getMessage(), e); 161 } 162 } 163 // for connection errors make poll return no result 164 return null; 165 } 166 } 167 168 protected static boolean delayExpired(AtomicLong atomic) { 169 long now = System.currentTimeMillis(); 170 long last = atomic.get(); 171 if (now > last + LOG_INTERVAL) { 172 if (atomic.compareAndSet(last, now)) { 173 return true; 174 } // else some other thread beat us to it 175 } 176 return false; 177 } 178 179}