001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
024import org.nuxeo.ecm.core.work.WorkHolder;
025import org.nuxeo.ecm.core.work.api.Work;
026
027import java.io.IOException;
028import java.net.ConnectException;
029import java.util.concurrent.BlockingQueue;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicLong;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.Lock;
034import java.util.concurrent.locks.ReentrantLock;
035
036import redis.clients.jedis.exceptions.JedisConnectionException;
037
038/**
039 * Redis-based {@link BlockingQueue}.
040 * <p>
041 * It has unlimited capacity, so never blocks on {@link #put} and {@link #offer} always returns {@code true}.
042 *
043 * @since 5.8
044 */
045public class RedisBlockingQueue extends NuxeoBlockingQueue {
046
047    private static final Log log = LogFactory.getLog(RedisBlockingQueue.class);
048
049    // this is so that we don't spam the logs with too many errors
050    private static final long LOG_INTERVAL = 1000 * 10; // 10s
051
052    private static AtomicLong LAST_IO_EXCEPTION = new AtomicLong(0);
053
054    private static AtomicLong LAST_CONNECTION_EXCEPTION = new AtomicLong(0);
055
056    protected final String queueId;
057
058    protected final RedisWorkQueuing queuing;
059
060    protected final Lock lock = new ReentrantLock();
061    protected final Condition notEmpty = lock.newCondition();
062
063    public RedisBlockingQueue(String queueId, RedisWorkQueuing queuing) {
064        this.queueId = queueId;
065        this.queuing = queuing;
066    }
067
068    @Override
069    public int getQueueSize() {
070        return queuing.getScheduledSize(queueId);
071    }
072
073    @Override
074    public Runnable take() throws InterruptedException {
075        for (; ; ) {
076            Runnable r = poll(1, TimeUnit.DAYS);
077            if (r != null) {
078                return r;
079            }
080        }
081    }
082
083    @Override
084    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
085        long nanos = unit.toNanos(timeout);
086        nanos = awaitActivation(nanos);
087        if (nanos <= 0) {
088            return null;
089        }
090        long end = System.currentTimeMillis() + TimeUnit.NANOSECONDS.toMillis(nanos);
091        for (; ; ) {
092            Runnable r = poll();
093            if (r != null) {
094                return r;
095            }
096            if (timeUntil(end) == 0) {
097                return null;
098            }
099            lock.lock();
100            try {
101                notEmpty.await(1, TimeUnit.SECONDS);
102            } finally {
103                lock.unlock();
104            }
105
106        }
107    }
108
109    @Override
110    public void putElement(Runnable r) {
111        Work work = WorkHolder.getWork(r);
112        lock.lock();
113        try {
114            queuing.addScheduledWork(queueId, work);
115            notEmpty.signal();
116        } catch (IOException e) {
117            log.error("Failed to add Work: " + work, e);
118            throw new RuntimeException(e);
119        } finally {
120            lock.unlock();
121        }
122    }
123
124    @Override
125    public Runnable pollElement() {
126        try {
127            Work work = queuing.getWorkFromQueue(queueId);
128            if (work != null) {
129                log.debug("Remove scheduled " + work);
130            }
131            return work == null ? null : new WorkHolder(work);
132        } catch (IOException e) {
133            if (delayExpired(LAST_IO_EXCEPTION)) {
134                // log full stacktrace
135                log.error(e.getMessage(), e);
136            }
137            // for io errors make poll return no result
138            return null;
139        } catch (JedisConnectionException e) {
140            if (delayExpired(LAST_CONNECTION_EXCEPTION)) {
141                Throwable cause = e.getCause();
142                if (cause != null && cause.getMessage().contains(ConnectException.class.getName())) {
143                    log.error(e.getMessage() + ": " + cause.getMessage());
144                    log.debug(e.getMessage(), e);
145                } else {
146                    // log full stacktrace
147                    log.error(e.getMessage(), e);
148                }
149            }
150            // for connection errors make poll return no result
151            return null;
152        }
153    }
154
155    protected static boolean delayExpired(AtomicLong atomic) {
156        long now = System.currentTimeMillis();
157        long last = atomic.get();
158        if (now > last + LOG_INTERVAL) {
159            if (atomic.compareAndSet(last, now)) {
160                return true;
161            } // else some other thread beat us to it
162        }
163        return false;
164    }
165
166}