001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import java.util.AbstractQueue;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.NoSuchElementException;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.ReentrantLock;
029
030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
031
032/**
033 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor
034 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds.
035 *
036 * @since 5.8
037 */
038public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
039
040    /*
041     * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it:
042     * - isEmpty()
043     * - size()
044     * - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout
045     * - take()
046     * - offer(e)
047     * - remove(e)
048     * - toArray(), toArray(a): for purge and shutdown
049     * - drainTo(c)
050     * - iterator() : hasNext(), next(), remove() (called by toArray)
051     */
052
053    protected final ReentrantLock activationLock = new ReentrantLock();
054
055    protected final Condition activation = activationLock.newCondition();
056
057    protected volatile boolean active = false;
058
059    protected final String queueId;
060
061    protected final WorkQueuing queuing;
062
063    protected NuxeoBlockingQueue(String queueId, WorkQueuing queuing) {
064        this.queueId = queueId;
065        this.queuing = queuing;
066    }
067
068    protected abstract WorkQueueMetrics metrics();
069
070    /**
071     * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves
072     * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all
073     * elements are again available.
074     *
075     * @param active {@code true} to make the queue active, or {@code false} to deactivate it
076     */
077    public WorkQueueMetrics setActive(boolean active) {
078        this.active = active;
079        activationLock.lock();
080        try {
081            activation.signalAll();
082        } finally {
083            activationLock.unlock();
084        }
085        return metrics();
086    }
087
088    @Override
089    public boolean offer(Runnable r) {
090        try {
091            put(r);
092        } catch (InterruptedException e) {
093            Thread.currentThread().interrupt();
094            throw new RuntimeException(e);
095        }
096        return true;
097    }
098
099    @Override
100    public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
101        // not needed for ThreadPoolExecutor
102        put(r);
103        return true;
104    }
105
106    @Override
107    public void put(Runnable r) throws InterruptedException {
108        putElement(r);
109    }
110
111    @Override
112    public Runnable peek() {
113        // not needed for ThreadPoolExecutor
114        throw new UnsupportedOperationException("not supported");
115    }
116
117    @Override
118    public Runnable poll() {
119        if (!active) {
120            return null;
121        }
122        Runnable runnable =  pollElement();
123        if (runnable == null) {
124            return null;
125        }
126        if (!active) {
127            queuing.workReschedule(queueId, WorkHolder.getWork(runnable));
128            return null;
129        }
130        return runnable;
131    }
132
133    protected long timeUntil(long end) {
134        long timeout = end - System.currentTimeMillis();
135        if (timeout < 0) {
136            timeout = 0;
137        }
138        return timeout;
139    }
140
141    protected long awaitActivation(long nanos) throws InterruptedException {
142        activationLock.lock();
143        try {
144            while (nanos > 0 && !active) {
145                nanos = activation.awaitNanos(nanos);
146            }
147
148        } finally {
149            activationLock.unlock();
150        }
151        return nanos;
152    }
153
154    @Override
155    public boolean contains(Object o) {
156        // TODO Auto-generated method stub
157        throw new UnsupportedOperationException();
158    }
159
160    @Override
161    public int size() {
162        return getQueueSize();
163    }
164
165    @Override
166    public int remainingCapacity() {
167        return Integer.MAX_VALUE;
168    }
169
170    @Override
171    public Iterator<Runnable> iterator() {
172        return new Iter();
173    }
174
175    /*
176     * Used by drainQueue/purge methods of ThreadPoolExector through toArray.
177     */
178    private class Iter implements Iterator<Runnable> {
179
180        @Override
181        public boolean hasNext() {
182            return false;
183        }
184
185        @Override
186        public Runnable next() {
187            throw new NoSuchElementException();
188        }
189
190        @Override
191        public void remove() {
192            throw new IllegalStateException();
193        }
194    }
195
196    @Override
197    public int drainTo(Collection<? super Runnable> c) {
198        return drainTo(c, Integer.MAX_VALUE);
199    }
200
201    @Override
202    public int drainTo(Collection<? super Runnable> c, int maxElements) {
203        for (int i = 0; i < maxElements; i++) {
204            Runnable r = poll();
205            if (r == null) {
206                return i;
207            }
208            c.add(r);
209        }
210        return maxElements;
211    }
212
213    /**
214     * Gets the size of the queue.
215     */
216    public abstract int getQueueSize();
217
218    /**
219     * Adds an element into this queue, waiting if necessary for space to become available.
220     */
221    public abstract void putElement(Runnable r) throws InterruptedException;
222
223    /**
224     * Retrieves and removes an element from the queue, or returns null if the queue is empty.
225     */
226    public abstract Runnable pollElement();
227
228}