001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.core.redis.contribs;
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
022import org.nuxeo.ecm.core.work.WorkHolder;
023import org.nuxeo.ecm.core.work.api.Work;
024
025import java.io.IOException;
026import java.net.ConnectException;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030import java.util.concurrent.locks.Condition;
031import java.util.concurrent.locks.Lock;
032import java.util.concurrent.locks.ReentrantLock;
033
034import redis.clients.jedis.exceptions.JedisConnectionException;
035
036/**
037 * Redis-based {@link BlockingQueue}.
038 * <p>
039 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}.
040 *
041 * @since 5.8
042 */
043public class RedisBlockingQueue extends NuxeoBlockingQueue {
044
045    private static final Log log = LogFactory.getLog(RedisBlockingQueue.class);
046
047    // this is so that we don't spam the logs with too many errors
048    private static final long LOG_INTERVAL = 1000 * 10; // 10s
049
050    private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0);
051
052    private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0);
053
054    protected final String queueId;
055
056    protected final RedisWorkQueuing queuing;
057
058    protected final Lock lock = new ReentrantLock();
059    protected final Condition notEmpty = lock.newCondition();
060
061    public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) {
062        this.queueId = queueId;
063        this.queuing = queuing;
064    }
065
066    @Override
067    public int getQueueSize() {
068        return queuing.getScheduledSize(queueId);
069    }
070
071    @Override
072    public Runnable take() throws InterruptedException {
073        for (; ; ) {
074            Runnable r = poll(1, TimeUnit.DAYS);
075            if (r != null) {
076                return r;
077            }
078        }
079    }
080
081    @Override
082    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
083        long nanos = unit.toNanos(timeout);
084        nanos = awaitActivation(nanos);
085        if (nanos <= 0) {
086            return null;
087        }
088        long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos);
089        for (; ; ) {
090            Runnable r = poll();
091            if (r != null) {
092                return r;
093            }
094            if (timeUntil(end) == 0) {
095                return null;
096            }
097            lock.lock();
098            try {
099                notEmpty.await(1, TimeUnit.SECONDS);
100            } finally {
101                lock.unlock();
102            }
103
104        }
105    }
106
107    @Override
108    public void putElement(Runnable r) {
109        Work work = WorkHolder.getWork(r);
110        lock.lock();
111        try {
112            queuing.addScheduledWork(queueId, work);
113            notEmpty.signal();
114        } catch (IOException e) {
115            log.error("Failed to add Work: " + work, e);
116            throw new RuntimeException(e);
117        } finally {
118            lock.unlock();
119        }
120    }
121
122    @Override
123    public Runnable pollElement() {
124        try {
125            Work work = queuing.removeScheduledWork(queueId);
126            if (work != null) {
127                log.debug("Remove scheduled " + work);
128            }
129            return work == null ? null : new WorkHolder(work);
130        } catch (IOException e) {
131            if (delayExpired(LAST_IO_EXCEPTION)) {
132                // log full stacktrace
133                log.error(e.getMessage(), e);
134            }
135            // for io errors make poll return no result
136            return null;
137        } catch (JedisConnectionException e) {
138            if (delayExpired(LAST_CONNECTION_EXCEPTION)) {
139                Throwable cause = e.getCause();
140                if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) {
141                    log.error(e.getMessage() + ": " + cause.getMessage());
142                    log.debug(e.getMessage(), e);
143                } else {
144                    // log full stacktrace
145                    log.error(e.getMessage(), e);
146                }
147            }
148            // for connection errors make poll return no result
149            return null;
150        }
151    }
152
153    protected static boolean delayExpired(AtomicLong atomic) {
154        long now = System.currentTimeMillis();
155        long last = atomic.get();
156        if (now > last + LOG_INTERVAL) {
157            if (atomic.compareAndSet(last, now)) {
158                return true;
159            } // else some other thread beat us to it
160        }
161        return false;
162    }
163
164}