001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import java.util.AbstractQueue;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.Condition;
027import java.util.concurrent.locks.ReentrantLock;
028
029import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
030
031/**
032 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor
033 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds.
034 *
035 * @since 5.8
036 */
037public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
038
039    /*
040     * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it:
041     * - isEmpty()
042     * - size()
043     * - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout
044     * - take()
045     * - offer(e)
046     * - remove(e)
047     * - toArray(), toArray(a): for purge and shutdown
048     * - drainTo(c)
049     * - iterator() : hasNext(), next(), remove() (called by toArray)
050     */
051
052    protected final ReentrantLock activationLock = new ReentrantLock();
053
054    protected final Condition activation = activationLock.newCondition();
055
056    protected volatile boolean active = true;
057
058    protected final String queueId;
059
060    protected final WorkQueuing queuing;
061
062    protected NuxeoBlockingQueue(String queueId, WorkQueuing queuing) {
063        this.queueId = queueId;
064        this.queuing = queuing;
065    }
066
067    protected abstract WorkQueueMetrics metrics();
068
069    /**
070     * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves
071     * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all
072     * elements are again available.
073     *
074     * @param active {@code true} to make the queue active, or {@code false} to deactivate it
075     */
076    public WorkQueueMetrics setActive(boolean active) {
077        WorkQueueMetrics metrics = metrics();
078        this.active = active;
079        activationLock.lock();
080        try {
081            activation.signalAll();
082        } finally {
083            activationLock.unlock();
084        }
085        return metrics;
086    }
087
088    @Override
089    public boolean offer(Runnable r) {
090        try {
091            put(r);
092        } catch (InterruptedException e) {
093            Thread.currentThread().interrupt(); // restore interrupt status
094            throw new RuntimeException("interrupted", e);
095        }
096        return true;
097    }
098
099    @Override
100    public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
101        // not needed for ThreadPoolExecutor
102        put(r);
103        return true;
104    }
105
106    @Override
107    public void put(Runnable r) throws InterruptedException {
108        putElement(r);
109    }
110
111    @Override
112    public Runnable peek() {
113        // not needed for ThreadPoolExecutor
114        throw new UnsupportedOperationException("not supported");
115    }
116
117    @Override
118    public Runnable poll() {
119        if (!active) {
120            return null;
121        }
122        return pollElement();
123    }
124
125    protected long timeUntil(long end) {
126        long timeout = end - System.currentTimeMillis();
127        if (timeout < 0) {
128            timeout = 0;
129        }
130        return timeout;
131    }
132
133    protected long awaitActivation(long nanos) throws InterruptedException {
134        activationLock.lock();
135        try {
136            while (nanos > 0 && !active) {
137                nanos = activation.awaitNanos(nanos);
138            }
139
140        } finally {
141            activationLock.unlock();
142        }
143        return nanos;
144    }
145
146    @Override
147    public boolean contains(Object o) {
148        // TODO Auto-generated method stub
149        throw new UnsupportedOperationException();
150    }
151
152    @Override
153    public int size() {
154        return getQueueSize();
155    }
156
157    @Override
158    public int remainingCapacity() {
159        return Integer.MAX_VALUE;
160    }
161
162    @Override
163    public Iterator<Runnable> iterator() {
164        return new Iter();
165    }
166
167    /*
168     * Used by drainQueue/purge methods of ThreadPoolExector through toArray.
169     */
170    private class Iter implements Iterator<Runnable> {
171
172        @Override
173        public boolean hasNext() {
174            return false;
175        }
176
177        @Override
178        public Runnable next() {
179            throw new UnsupportedOperationException();
180        }
181
182        @Override
183        public void remove() {
184            throw new UnsupportedOperationException();
185        }
186    }
187
188    @Override
189    public int drainTo(Collection<? super Runnable> c) {
190        return drainTo(c, Integer.MAX_VALUE);
191    }
192
193    @Override
194    public int drainTo(Collection<? super Runnable> c, int maxElements) {
195        for (int i = 0; i < maxElements; i++) {
196            Runnable r = poll();
197            if (r == null) {
198                return i;
199            }
200            c.add(r);
201        }
202        return maxElements;
203    }
204
205    /**
206     * Gets the size of the queue.
207     */
208    public abstract int getQueueSize();
209
210    /**
211     * Adds an element into this queue, waiting if necessary for space to become available.
212     */
213    public abstract void putElement(Runnable r) throws InterruptedException;
214
215    /**
216     * Retrieves and removes an element from the queue, or returns null if the queue is empty.
217     */
218    public abstract Runnable pollElement();
219
220}