001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import java.util.AbstractQueue;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.Condition;
027import java.util.concurrent.locks.ReentrantLock;
028
029import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
030
031/**
032 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor
033 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds.
034 *
035 * @since 5.8
036 */
037public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
038
039    /*
040     * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it:
041     * - isEmpty()
042     * - size()
043     * - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout
044     * - take()
045     * - offer(e)
046     * - remove(e)
047     * - toArray(), toArray(a): for purge and shutdown
048     * - drainTo(c)
049     * - iterator() : hasNext(), next(), remove() (called by toArray)
050     */
051
052    protected final ReentrantLock activationLock = new ReentrantLock();
053
054    protected final Condition activation = activationLock.newCondition();
055
056    protected volatile boolean active = false;
057
058    protected final String queueId;
059
060    protected final WorkQueuing queuing;
061
062    protected NuxeoBlockingQueue(String queueId, WorkQueuing queuing) {
063        this.queueId = queueId;
064        this.queuing = queuing;
065    }
066
067    protected abstract WorkQueueMetrics metrics();
068
069    /**
070     * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves
071     * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all
072     * elements are again available.
073     *
074     * @param active {@code true} to make the queue active, or {@code false} to deactivate it
075     */
076    public WorkQueueMetrics setActive(boolean active) {
077        this.active = active;
078        activationLock.lock();
079        try {
080            activation.signalAll();
081        } finally {
082            activationLock.unlock();
083        }
084        return metrics();
085    }
086
087    @Override
088    public boolean offer(Runnable r) {
089        try {
090            put(r);
091        } catch (InterruptedException e) {
092            Thread.currentThread().interrupt();
093            throw new RuntimeException(e);
094        }
095        return true;
096    }
097
098    @Override
099    public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
100        // not needed for ThreadPoolExecutor
101        put(r);
102        return true;
103    }
104
105    @Override
106    public void put(Runnable r) throws InterruptedException {
107        putElement(r);
108    }
109
110    @Override
111    public Runnable peek() {
112        // not needed for ThreadPoolExecutor
113        throw new UnsupportedOperationException("not supported");
114    }
115
116    @Override
117    public Runnable poll() {
118        if (!active) {
119            return null;
120        }
121        Runnable runnable =  pollElement();
122        if (runnable == null) {
123            return null;
124        }
125        if (!active) {
126            queuing.workReschedule(queueId, WorkHolder.getWork(runnable));
127            return null;
128        }
129        return runnable;
130    }
131
132    protected long timeUntil(long end) {
133        long timeout = end - System.currentTimeMillis();
134        if (timeout < 0) {
135            timeout = 0;
136        }
137        return timeout;
138    }
139
140    protected long awaitActivation(long nanos) throws InterruptedException {
141        activationLock.lock();
142        try {
143            while (nanos > 0 && !active) {
144                nanos = activation.awaitNanos(nanos);
145            }
146
147        } finally {
148            activationLock.unlock();
149        }
150        return nanos;
151    }
152
153    @Override
154    public boolean contains(Object o) {
155        // TODO Auto-generated method stub
156        throw new UnsupportedOperationException();
157    }
158
159    @Override
160    public int size() {
161        return getQueueSize();
162    }
163
164    @Override
165    public int remainingCapacity() {
166        return Integer.MAX_VALUE;
167    }
168
169    @Override
170    public Iterator<Runnable> iterator() {
171        return new Iter();
172    }
173
174    /*
175     * Used by drainQueue/purge methods of ThreadPoolExector through toArray.
176     */
177    private class Iter implements Iterator<Runnable> {
178
179        @Override
180        public boolean hasNext() {
181            return false;
182        }
183
184        @Override
185        public Runnable next() {
186            throw new UnsupportedOperationException();
187        }
188
189        @Override
190        public void remove() {
191            throw new UnsupportedOperationException();
192        }
193    }
194
195    @Override
196    public int drainTo(Collection<? super Runnable> c) {
197        return drainTo(c, Integer.MAX_VALUE);
198    }
199
200    @Override
201    public int drainTo(Collection<? super Runnable> c, int maxElements) {
202        for (int i = 0; i < maxElements; i++) {
203            Runnable r = poll();
204            if (r == null) {
205                return i;
206            }
207            c.add(r);
208        }
209        return maxElements;
210    }
211
212    /**
213     * Gets the size of the queue.
214     */
215    public abstract int getQueueSize();
216
217    /**
218     * Adds an element into this queue, waiting if necessary for space to become available.
219     */
220    public abstract void putElement(Runnable r) throws InterruptedException;
221
222    /**
223     * Retrieves and removes an element from the queue, or returns null if the queue is empty.
224     */
225    public abstract Runnable pollElement();
226
227}