001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
024import org.nuxeo.ecm.core.work.WorkHolder;
025import org.nuxeo.ecm.core.work.api.Work;
026
027import java.io.IOException;
028import java.net.ConnectException;
029import java.util.concurrent.BlockingQueue;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.locks.Condition;
034import java.util.concurrent.locks.Lock;
035import java.util.concurrent.locks.ReentrantLock;
036
037import redis.clients.jedis.exceptions.JedisConnectionException;
038
039/**
040 * Redis-based {@link BlockingQueue}.
041 * <p>
042 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}.
043 *
044 * @since 5.8
045 */
046public class RedisBlockingQueue extends NuxeoBlockingQueue {
047
048    private static final Log log = LogFactory.getLog(RedisBlockingQueue.class);
049
050    // this is so that we don't spam the logs with too many errors
051    private static final long LOG_INTERVAL = 1000 * 10; // 10s
052
053    private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0);
054
055    private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0);
056
057    private static final int REMOTE_POLL_INTERVAL_MS = 1000;
058
059    private static final int REMOTE_POLL_INTERVAL_STDEV_MS = 200;
060
061    protected final String queueId;
062
063    protected final RedisWorkQueuing queuing;
064
065    protected final Lock lock = new ReentrantLock();
066    protected final Condition notEmpty = lock.newCondition();
067
068    public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) {
069        this.queueId = queueId;
070        this.queuing = queuing;
071    }
072
073    @Override
074    public int getQueueSize() {
075        return queuing.getScheduledSize(queueId);
076    }
077
078    @Override
079    public Runnable take() throws InterruptedException {
080        for (; ; ) {
081            Runnable r = poll(1, TimeUnit.DAYS);
082            if (r != null) {
083                return r;
084            }
085        }
086    }
087
088    @Override
089    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
090        long nanos = unit.toNanos(timeout);
091        nanos = awaitActivation(nanos);
092        if (nanos <= 0) {
093            return null;
094        }
095        long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos);
096        for (; ; ) {
097            Runnable r = poll();
098            if (r != null) {
099                return r;
100            }
101            if (timeUntil(end) == 0) {
102                return null;
103            }
104            lock.lock();
105            try {
106                // wake up if our instance has submitted a new job or wait
107                notEmpty.await(getRemotePollInterval(), TimeUnit.MILLISECONDS);
108            } finally {
109                lock.unlock();
110            }
111
112        }
113    }
114
115    private int getRemotePollInterval() {
116        // add some randomness so we don't generate periodic spike when all workers are starving
117        return REMOTE_POLL_INTERVAL_MS + ThreadLocalRandom.current().nextInt(-1 * REMOTE_POLL_INTERVAL_STDEV_MS,
118                REMOTE_POLL_INTERVAL_STDEV_MS);
119    }
120
121    @Override
122    public void putElement(Runnable r) {
123        Work work = WorkHolder.getWork(r);
124        lock.lock();
125        try {
126            queuing.addScheduledWork(queueId, work);
127            notEmpty.signal();
128        } catch (IOException e) {
129            log.error("Failed to add Work: " + work, e);
130            throw new RuntimeException(e);
131        } finally {
132            lock.unlock();
133        }
134    }
135
136    @Override
137    public Runnable pollElement() {
138        try {
139            Work work = queuing.getWorkFromQueue(queueId);
140            return work == null ? null : new WorkHolder(work);
141        } catch (IOException e) {
142            if (delayExpired(LAST_IO_EXCEPTION)) {
143                // log full stacktrace
144                log.error(e.getMessage(), e);
145            }
146            // for io errors make poll return no result
147            return null;
148        } catch (JedisConnectionException e) {
149            if (delayExpired(LAST_CONNECTION_EXCEPTION)) {
150                Throwable cause = e.getCause();
151                if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) {
152                    log.error(e.getMessage() + ": " + cause.getMessage());
153                    log.debug(e.getMessage(), e);
154                } else {
155                    // log full stacktrace
156                    log.error(e.getMessage(), e);
157                }
158            }
159            // for connection errors make poll return no result
160            return null;
161        }
162    }
163
164    protected static boolean delayExpired(AtomicLong atomic) {
165        long now = System.currentTimeMillis();
166        long last = atomic.get();
167        if (now > last + LOG_INTERVAL) {
168            if (atomic.compareAndSet(last, now)) {
169                return true;
170            } // else some other thread beat us to it
171        }
172        return false;
173    }
174
175}