001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.IOException;
022import java.net.ConnectException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ThreadLocalRandom;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicLong;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.Lock;
029import java.util.concurrent.locks.ReentrantLock;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
034import org.nuxeo.ecm.core.work.WorkHolder;
035import org.nuxeo.ecm.core.work.api.Work;
036import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
037
038import redis.clients.jedis.exceptions.JedisConnectionException;
039
040/**
041 * Redis-based {@link BlockingQueue}.
042 * <p>
043 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}.
044 *
045 * @since 5.8
046 */
047public class RedisBlockingQueue extends NuxeoBlockingQueue {
048
049    private static final Log log = LogFactory.getLog(RedisBlockingQueue.class);
050
051    // this is so that we don't spam the logs with too many errors
052    private static final long LOG_INTERVAL = 1000 * 10; // 10s
053
054    private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0);
055
056    private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0);
057
058    private static final int REMOTE_POLL_INTERVAL_MS = 1000;
059
060    private static final int REMOTE_POLL_INTERVAL_STDEV_MS = 200;
061
062    protected final RedisWorkQueuing queuing;
063
064    protected final Lock lock = new ReentrantLock();
065    protected final Condition notEmpty = lock.newCondition();
066
067    public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) {
068        super(queueId, queuing);
069        this.queuing = queuing;
070    }
071
072    @Override
073    protected WorkQueueMetrics metrics() {
074        return queuing.metrics(queueId);
075    }
076
077    @Override
078    public int getQueueSize() {
079        return queuing.metrics(queueId).scheduled.intValue();
080    }
081
082    @Override
083    public Runnable take() throws InterruptedException {
084        for (; ; ) {
085            Runnable r = poll(1, TimeUnit.DAYS);
086            if (r != null) {
087                return r;
088            }
089        }
090    }
091
092    @Override
093    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
094        long nanos = unit.toNanos(timeout);
095        nanos = awaitActivation(nanos);
096        if (nanos <= 0) {
097            return null;
098        }
099        long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos);
100        for (; ; ) {
101            Runnable r = poll();
102            if (r != null) {
103                return r;
104            }
105            if (timeUntil(end) == 0) {
106                return null;
107            }
108            lock.lock();
109            try {
110                // wake up if our instance has submitted a new job or wait
111                notEmpty.await(getRemotePollInterval(), TimeUnit.MILLISECONDS);
112            } finally {
113                lock.unlock();
114            }
115
116        }
117    }
118
119    private int getRemotePollInterval() {
120        // add some randomness so we don't generate periodic spike when all workers are starving
121        return REMOTE_POLL_INTERVAL_MS + ThreadLocalRandom.current().nextInt(-1 * REMOTE_POLL_INTERVAL_STDEV_MS,
122                REMOTE_POLL_INTERVAL_STDEV_MS);
123    }
124
125    @Override
126    public void putElement(Runnable r) {
127        Work work = WorkHolder.getWork(r);
128        lock.lock();
129        try {
130            queuing.workSetScheduled(queueId, work);
131            notEmpty.signal();
132        } catch (IOException e) {
133            log.error("Failed to add Work: " + work, e);
134            throw new RuntimeException(e);
135        } finally {
136            lock.unlock();
137        }
138    }
139
140    @Override
141    public Runnable pollElement() {
142        try {
143            Work work = queuing.getWorkFromQueue(queueId);
144            return work == null ? null : new WorkHolder(work);
145        } catch (IOException e) {
146            if (delayExpired(LAST_IO_EXCEPTION)) {
147                // log full stacktrace
148                log.error(e.getMessage(), e);
149            }
150            // for io errors make poll return no result
151            return null;
152        } catch (JedisConnectionException e) {
153            if (delayExpired(LAST_CONNECTION_EXCEPTION)) {
154                Throwable cause = e.getCause();
155                if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) {
156                    log.error(e.getMessage() + ": " + cause.getMessage());
157                    log.debug(e.getMessage(), e);
158                } else {
159                    // log full stacktrace
160                    log.error(e.getMessage(), e);
161                }
162            }
163            // for connection errors make poll return no result
164            return null;
165        }
166    }
167
168    protected static boolean delayExpired(AtomicLong atomic) {
169        long now = System.currentTimeMillis();
170        long last = atomic.get();
171        if (now > last + LOG_INTERVAL) {
172            if (atomic.compareAndSet(last, now)) {
173                return true;
174            } // else some other thread beat us to it
175        }
176        return false;
177    }
178
179}