001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import java.util.AbstractQueue;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.Condition;
027import java.util.concurrent.locks.ReentrantLock;
028
029/**
030 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor
031 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds.
032 *
033 * @since 5.8
034 */
035public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
036
037    /*
038     * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it: - isEmpty()
039     * - size() - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout - take() -
040     * offer(e) - remove(e) - toArray(), toArray(a): for purge and shutdown - drainTo(c) - iterator() : hasNext(),
041     * next(), remove() (called by toArray)
042     */
043
044    protected final ReentrantLock activationLock = new ReentrantLock();
045
046    protected final Condition activation = activationLock.newCondition();
047
048    protected volatile boolean active = true;
049
050    /**
051     * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves
052     * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all
053     * elements are again available.
054     *
055     * @param active {@code true} to make the queue active, or {@code false} to deactivate it
056     */
057    public void setActive(boolean active) {
058        this.active = active;
059        activationLock.lock();
060        try {
061            activation.signalAll();
062        } finally {
063            activationLock.unlock();
064        }
065    }
066
067    @Override
068    public boolean offer(Runnable r) {
069        try {
070            put(r);
071        } catch (InterruptedException e) {
072            Thread.currentThread().interrupt(); // restore interrupt status
073            throw new RuntimeException("interrupted", e);
074        }
075        return true;
076    }
077
078    @Override
079    public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
080        // not needed for ThreadPoolExecutor
081        put(r);
082        return true;
083    }
084
085    @Override
086    public void put(Runnable r) throws InterruptedException {
087        putElement(r);
088    }
089
090    @Override
091    public Runnable peek() {
092        // not needed for ThreadPoolExecutor
093        throw new UnsupportedOperationException("not supported");
094    }
095
096    @Override
097    public Runnable poll() {
098        if (!active) {
099            return null;
100        }
101        return pollElement();
102    }
103
104    protected long timeUntil(long end) {
105        long timeout = end - System.currentTimeMillis();
106        if (timeout < 0) {
107            timeout = 0;
108        }
109        return timeout;
110    }
111
112    protected long awaitActivation(long nanos) throws InterruptedException {
113        activationLock.lock();
114        try {
115            while (nanos > 0 && !active) {
116                nanos = activation.awaitNanos(nanos);
117            }
118
119        } finally {
120            activationLock.unlock();
121        }
122        return nanos;
123    }
124
125    @Override
126    public boolean contains(Object o) {
127        // TODO Auto-generated method stub
128        throw new UnsupportedOperationException();
129    }
130
131    @Override
132    public int size() {
133        return getQueueSize();
134    }
135
136    @Override
137    public int remainingCapacity() {
138        return Integer.MAX_VALUE;
139    }
140
141    @Override
142    public Iterator<Runnable> iterator() {
143        return new Iter();
144    }
145
146    /*
147     * Used by drainQueue/purge methods of ThreadPoolExector through toArray.
148     */
149    private class Iter implements Iterator<Runnable> {
150
151        @Override
152        public boolean hasNext() {
153            return false;
154        }
155
156        @Override
157        public Runnable next() {
158            throw new UnsupportedOperationException();
159        }
160
161        @Override
162        public void remove() {
163            throw new UnsupportedOperationException();
164        }
165    }
166
167    @Override
168    public int drainTo(Collection<? super Runnable> c) {
169        return drainTo(c, Integer.MAX_VALUE);
170    }
171
172    @Override
173    public int drainTo(Collection<? super Runnable> c, int maxElements) {
174        for (int i = 0; i < maxElements; i++) {
175            Runnable r = poll();
176            if (r == null) {
177                return i;
178            }
179            c.add(r);
180        }
181        return maxElements;
182    }
183
184    /**
185     * Gets the size of the queue.
186     */
187    public abstract int getQueueSize();
188
189    /**
190     * Adds an element into this queue, waiting if necessary for space to become available.
191     */
192    public abstract void putElement(Runnable r) throws InterruptedException;
193
194    /**
195     * Retrieves and removes an element from the queue, or returns null if the queue is empty.
196     */
197    public abstract Runnable pollElement();
198
199}