001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.IOException;
022import java.net.ConnectException;
023import java.time.Duration;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import java.util.concurrent.locks.Condition;
029import java.util.concurrent.locks.Lock;
030import java.util.concurrent.locks.ReentrantLock;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
035import org.nuxeo.ecm.core.work.WorkHolder;
036import org.nuxeo.ecm.core.work.api.Work;
037import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
038
039import redis.clients.jedis.exceptions.JedisConnectionException;
040
041/**
042 * Redis-based {@link BlockingQueue}.
043 * <p>
044 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}.
045 *
046 * @since 5.8
047 */
048public class RedisBlockingQueue extends NuxeoBlockingQueue {
049
050    private static final Log log = LogFactory.getLog(RedisBlockingQueue.class);
051
052    // this is so that we don't spam the logs with too many errors
053    private static final long LOG_INTERVAL = Duration.ofSeconds(10).toMillis();
054
055    private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0);
056
057    private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0);
058
059    private static final int REMOTE_POLL_INTERVAL_MS = 1000;
060
061    private static final int REMOTE_POLL_INTERVAL_STDEV_MS = 200;
062
063    protected final RedisWorkQueuing queuing;
064
065    protected final Lock lock = new ReentrantLock();
066    protected final Condition notEmpty = lock.newCondition();
067
068    public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) {
069        super(queueId, queuing);
070        this.queuing = queuing;
071    }
072
073    @Override
074    protected WorkQueueMetrics metrics() {
075        return queuing.metrics(queueId);
076    }
077
078    @Override
079    public int getQueueSize() {
080        return queuing.metrics(queueId).scheduled.intValue();
081    }
082
083    @Override
084    public Runnable take() throws InterruptedException {
085        for (; ; ) {
086            Runnable r = poll(1, TimeUnit.DAYS);
087            if (r != null) {
088                return r;
089            }
090        }
091    }
092
093    @Override
094    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
095        long nanos = unit.toNanos(timeout);
096        nanos = awaitActivation(nanos);
097        if (nanos <= 0) {
098            return null;
099        }
100        long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos);
101        for (; ; ) {
102            Runnable r = poll();
103            if (r != null) {
104                return r;
105            }
106            if (timeUntil(end) == 0) {
107                return null;
108            }
109            lock.lock();
110            try {
111                // wake up if our instance has submitted a new job or wait
112                notEmpty.await(getRemotePollInterval(), TimeUnit.MILLISECONDS);
113            } finally {
114                lock.unlock();
115            }
116
117        }
118    }
119
120    private int getRemotePollInterval() {
121        // add some randomness so we don't generate periodic spike when all workers are starving
122        return REMOTE_POLL_INTERVAL_MS + ThreadLocalRandom.current().nextInt(-1 * REMOTE_POLL_INTERVAL_STDEV_MS,
123                REMOTE_POLL_INTERVAL_STDEV_MS);
124    }
125
126    @Override
127    public void putElement(Runnable r) {
128        Work work = WorkHolder.getWork(r);
129        lock.lock();
130        try {
131            queuing.workSetScheduled(queueId, work);
132            notEmpty.signal();
133        } catch (IOException e) {
134            log.error("Failed to add Work: " + work, e);
135            throw new RuntimeException(e);
136        } finally {
137            lock.unlock();
138        }
139    }
140
141    @Override
142    public Runnable pollElement() {
143        try {
144            Work work = queuing.getWorkFromQueue(queueId);
145            return work == null ? null : new WorkHolder(work);
146        } catch (IOException e) {
147            if (delayExpired(LAST_IO_EXCEPTION)) {
148                // log full stacktrace
149                log.error(e.getMessage(), e);
150            }
151            // for io errors make poll return no result
152            return null;
153        } catch (JedisConnectionException e) {
154            if (delayExpired(LAST_CONNECTION_EXCEPTION)) {
155                Throwable cause = e.getCause();
156                if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) {
157                    log.error(e.getMessage() + ": " + cause.getMessage());
158                    log.debug(e.getMessage(), e);
159                } else {
160                    // log full stacktrace
161                    log.error(e.getMessage(), e);
162                }
163            }
164            // for connection errors make poll return no result
165            return null;
166        }
167    }
168
169    protected static boolean delayExpired(AtomicLong atomic) {
170        long now = System.currentTimeMillis();
171        long last = atomic.get();
172        if (now > last + LOG_INTERVAL) {
173            if (atomic.compareAndSet(last, now)) {
174                return true;
175            } // else some other thread beat us to it
176        }
177        return false;
178    }
179
180}