001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.core.work;
018
019import java.util.AbstractQueue;
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.locks.Condition;
025import java.util.concurrent.locks.ReentrantLock;
026
027/**
028 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor
029 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds.
030 *
031 * @since 5.8
032 */
033public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
034
035    /*
036     * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it: - isEmpty()
037     * - size() - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout - take() -
038     * offer(e) - remove(e) - toArray(), toArray(a): for purge and shutdown - drainTo(c) - iterator() : hasNext(),
039     * next(), remove() (called by toArray)
040     */
041
042    protected final ReentrantLock activationLock = new ReentrantLock();
043
044    protected final Condition activation = activationLock.newCondition();
045
046    protected volatile boolean active = true;
047
048    /**
049     * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves
050     * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all
051     * elements are again available.
052     *
053     * @param active {@code true} to make the queue active, or {@code false} to deactivate it
054     */
055    public void setActive(boolean active) {
056        this.active = active;
057        activationLock.lock();
058        try {
059            activation.signalAll();
060        } finally {
061            activationLock.unlock();
062        }
063    }
064
065    @Override
066    public boolean offer(Runnable r) {
067        try {
068            put(r);
069        } catch (InterruptedException e) {
070            Thread.currentThread().interrupt(); // restore interrupt status
071            throw new RuntimeException("interrupted", e);
072        }
073        return true;
074    }
075
076    @Override
077    public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
078        // not needed for ThreadPoolExecutor
079        put(r);
080        return true;
081    }
082
083    @Override
084    public void put(Runnable r) throws InterruptedException {
085        putElement(r);
086    }
087
088    @Override
089    public Runnable peek() {
090        // not needed for ThreadPoolExecutor
091        throw new UnsupportedOperationException("not supported");
092    }
093
094    @Override
095    public Runnable poll() {
096        if (!active) {
097            return null;
098        }
099        return pollElement();
100    }
101
102    protected long timeUntil(long end) {
103        long timeout = end - System.currentTimeMillis();
104        if (timeout < 0) {
105            timeout = 0;
106        }
107        return timeout;
108    }
109
110    protected long awaitActivation(long nanos) throws InterruptedException {
111        activationLock.lock();
112        try {
113            while (nanos > 0 && !active) {
114                nanos = activation.awaitNanos(nanos);
115            }
116
117        } finally {
118            activationLock.unlock();
119        }
120        return nanos;
121    }
122
123    @Override
124    public boolean contains(Object o) {
125        // TODO Auto-generated method stub
126        throw new UnsupportedOperationException();
127    }
128
129    @Override
130    public int size() {
131        return getQueueSize();
132    }
133
134    @Override
135    public int remainingCapacity() {
136        return Integer.MAX_VALUE;
137    }
138
139    @Override
140    public Iterator<Runnable> iterator() {
141        return new Iter();
142    }
143
144    /*
145     * Used by drainQueue/purge methods of ThreadPoolExector through toArray.
146     */
147    private class Iter implements Iterator<Runnable> {
148
149        public Iter() {
150            throw new UnsupportedOperationException();
151        }
152
153        @Override
154        public boolean hasNext() {
155            // TODO Auto-generated method stub
156            throw new UnsupportedOperationException();
157        }
158
159        @Override
160        public Runnable next() {
161            // TODO Auto-generated method stub
162            throw new UnsupportedOperationException();
163        }
164
165        @Override
166        public void remove() {
167            // TODO Auto-generated method stub
168            throw new UnsupportedOperationException();
169        }
170    }
171
172    @Override
173    public int drainTo(Collection<? super Runnable> c) {
174        return drainTo(c, Integer.MAX_VALUE);
175    }
176
177    @Override
178    public int drainTo(Collection<? super Runnable> c, int maxElements) {
179        for (int i = 0; i < maxElements; i++) {
180            Runnable r = poll();
181            if (r == null) {
182                return i;
183            }
184            c.add(r);
185        }
186        return maxElements;
187    }
188
189    /**
190     * Gets the size of the queue.
191     */
192    public abstract int getQueueSize();
193
194    /**
195     * Adds an element into this queue, waiting if necessary for space to become available.
196     */
197    public abstract void putElement(Runnable r) throws InterruptedException;
198
199    /**
200     * Retrieves and removes an element from the queue, or returns null if the queue is empty.
201     */
202    public abstract Runnable pollElement();
203
204}