001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.IOException; 022import java.net.ConnectException; 023import java.time.Duration; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import java.util.concurrent.locks.Condition; 029import java.util.concurrent.locks.Lock; 030import java.util.concurrent.locks.ReentrantLock; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 035import org.nuxeo.ecm.core.work.WorkHolder; 036import org.nuxeo.ecm.core.work.api.Work; 037import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 038 039import redis.clients.jedis.exceptions.JedisConnectionException; 040 041/** 042 * Redis-based {@link BlockingQueue}. 043 * <p> 044 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}. 045 * 046 * @since 5.8 047 */ 048public class RedisBlockingQueue extends NuxeoBlockingQueue { 049 050 private static final Log log = LogFactory.getLog(RedisBlockingQueue.class); 051 052 // this is so that we don't spam the logs with too many errors 053 private static final long LOG_INTERVAL = Duration.ofSeconds(10).toMillis(); 054 055 private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0); 056 057 private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0); 058 059 private static final int REMOTE_POLL_INTERVAL_MS = 1000; 060 061 private static final int REMOTE_POLL_INTERVAL_STDEV_MS = 200; 062 063 protected final RedisWorkQueuing queuing; 064 065 protected final Lock lock = new ReentrantLock(); 066 protected final Condition notEmpty = lock.newCondition(); 067 068 public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) { 069 super(queueId, queuing); 070 this.queuing = queuing; 071 } 072 073 @Override 074 protected WorkQueueMetrics metrics() { 075 return queuing.metrics(queueId); 076 } 077 078 @Override 079 public int getQueueSize() { 080 return queuing.metrics(queueId).scheduled.intValue(); 081 } 082 083 @Override 084 public Runnable take() throws InterruptedException { 085 for (; ; ) { 086 Runnable r = poll(1, TimeUnit.DAYS); 087 if (r != null) { 088 return r; 089 } 090 } 091 } 092 093 @Override 094 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 095 long nanos = unit.toNanos(timeout); 096 nanos = awaitActivation(nanos); 097 if (nanos <= 0) { 098 return null; 099 } 100 long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos); 101 for (; ; ) { 102 Runnable r = poll(); 103 if (r != null) { 104 return r; 105 } 106 if (timeUntil(end) == 0) { 107 return null; 108 } 109 lock.lock(); 110 try { 111 // wake up if our instance has submitted a new job or wait 112 notEmpty.await(getRemotePollInterval(), TimeUnit.MILLISECONDS); 113 } finally { 114 lock.unlock(); 115 } 116 117 } 118 } 119 120 private int getRemotePollInterval() { 121 // add some randomness so we don't generate periodic spike when all workers are starving 122 return REMOTE_POLL_INTERVAL_MS + ThreadLocalRandom.current().nextInt(-1 * REMOTE_POLL_INTERVAL_STDEV_MS, 123 REMOTE_POLL_INTERVAL_STDEV_MS); 124 } 125 126 @Override 127 public void putElement(Runnable r) { 128 Work work = WorkHolder.getWork(r); 129 lock.lock(); 130 try { 131 queuing.workSetScheduled(queueId, work); 132 notEmpty.signal(); 133 } catch (IOException e) { 134 log.error("Failed to add Work: " + work, e); 135 throw new RuntimeException(e); 136 } finally { 137 lock.unlock(); 138 } 139 } 140 141 @Override 142 public Runnable pollElement() { 143 try { 144 Work work = queuing.getWorkFromQueue(queueId); 145 return work == null ? null : new WorkHolder(work); 146 } catch (IOException e) { 147 if (delayExpired(LAST_IO_EXCEPTION)) { 148 // log full stacktrace 149 log.error(e.getMessage(), e); 150 } 151 // for io errors make poll return no result 152 return null; 153 } catch (JedisConnectionException e) { 154 if (delayExpired(LAST_CONNECTION_EXCEPTION)) { 155 Throwable cause = e.getCause(); 156 if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) { 157 log.error(e.getMessage() + ": " + cause.getMessage()); 158 log.debug(e.getMessage(), e); 159 } else { 160 // log full stacktrace 161 log.error(e.getMessage(), e); 162 } 163 } 164 // for connection errors make poll return no result 165 return null; 166 } 167 } 168 169 protected static boolean delayExpired(AtomicLong atomic) { 170 long now = System.currentTimeMillis(); 171 long last = atomic.get(); 172 if (now > last + LOG_INTERVAL) { 173 if (atomic.compareAndSet(last, now)) { 174 return true; 175 } // else some other thread beat us to it 176 } 177 return false; 178 } 179 180}