001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 024import org.nuxeo.ecm.core.work.WorkHolder; 025import org.nuxeo.ecm.core.work.api.Work; 026 027import java.io.IOException; 028import java.net.ConnectException; 029import java.util.concurrent.BlockingQueue; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicLong; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.Lock; 034import java.util.concurrent.locks.ReentrantLock; 035 036import redis.clients.jedis.exceptions.JedisConnectionException; 037 038/** 039 * Redis-based {@link BlockingQueue}. 040 * <p> 041 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}. 042 * 043 * @since 5.8 044 */ 045public class RedisBlockingQueue extends NuxeoBlockingQueue { 046 047 private static final Log log = LogFactory.getLog(RedisBlockingQueue.class); 048 049 // this is so that we don't spam the logs with too many errors 050 private static final long LOG_INTERVAL = 1000 * 10; // 10s 051 052 private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0); 053 054 private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0); 055 056 protected final String queueId; 057 058 protected final RedisWorkQueuing queuing; 059 060 protected final Lock lock = new ReentrantLock(); 061 protected final Condition notEmpty = lock.newCondition(); 062 063 public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) { 064 this.queueId = queueId; 065 this.queuing = queuing; 066 } 067 068 @Override 069 public int getQueueSize() { 070 return queuing.getScheduledSize(queueId); 071 } 072 073 @Override 074 public Runnable take() throws InterruptedException { 075 for (; ; ) { 076 Runnable r = poll(1, TimeUnit.DAYS); 077 if (r != null) { 078 return r; 079 } 080 } 081 } 082 083 @Override 084 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 085 long nanos = unit.toNanos(timeout); 086 nanos = awaitActivation(nanos); 087 if (nanos <= 0) { 088 return null; 089 } 090 long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos); 091 for (; ; ) { 092 Runnable r = poll(); 093 if (r != null) { 094 return r; 095 } 096 if (timeUntil(end) == 0) { 097 return null; 098 } 099 lock.lock(); 100 try { 101 notEmpty.await(1, TimeUnit.SECONDS); 102 } finally { 103 lock.unlock(); 104 } 105 106 } 107 } 108 109 @Override 110 public void putElement(Runnable r) { 111 Work work = WorkHolder.getWork(r); 112 lock.lock(); 113 try { 114 queuing.addScheduledWork(queueId, work); 115 notEmpty.signal(); 116 } catch (IOException e) { 117 log.error("Failed to add Work: " + work, e); 118 throw new RuntimeException(e); 119 } finally { 120 lock.unlock(); 121 } 122 } 123 124 @Override 125 public Runnable pollElement() { 126 try { 127 Work work = queuing.getWorkFromQueue(queueId); 128 if (work != null) { 129 log.debug("Remove scheduled " + work); 130 } 131 return work == null ? null : new WorkHolder(work); 132 } catch (IOException e) { 133 if (delayExpired(LAST_IO_EXCEPTION)) { 134 // log full stacktrace 135 log.error(e.getMessage(), e); 136 } 137 // for io errors make poll return no result 138 return null; 139 } catch (JedisConnectionException e) { 140 if (delayExpired(LAST_CONNECTION_EXCEPTION)) { 141 Throwable cause = e.getCause(); 142 if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) { 143 log.error(e.getMessage() + ": " + cause.getMessage()); 144 log.debug(e.getMessage(), e); 145 } else { 146 // log full stacktrace 147 log.error(e.getMessage(), e); 148 } 149 } 150 // for connection errors make poll return no result 151 return null; 152 } 153 } 154 155 protected static boolean delayExpired(AtomicLong atomic) { 156 long now = System.currentTimeMillis(); 157 long last = atomic.get(); 158 if (now > last + LOG_INTERVAL) { 159 if (atomic.compareAndSet(last, now)) { 160 return true; 161 } // else some other thread beat us to it 162 } 163 return false; 164 } 165 166}