001/*
002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.BlockingQueue;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.locks.ReentrantLock;
033import java.util.stream.Collectors;
034
035import org.nuxeo.ecm.core.work.api.Work;
036import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
037
038/**
039 * Memory-based {@link BlockingQueue}.
040 * <p>
041 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are
042 * {@link WorkHolder}s.
043 */
044public class MemoryBlockingQueue extends NuxeoBlockingQueue {
045
046    /**
047     * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls.
048     */
049    private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> {
050
051        private static final long serialVersionUID = 1L;
052
053        private final ReentrantLock limitedPutLock = new ReentrantLock();
054
055        private final int limitedCapacity;
056
057        /**
058         * Creates a {@link LinkedBlockingQueue} with a maximum capacity.
059         * <p>
060         * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
061         *
062         * @param capacity the capacity, or -1 for unbounded
063         */
064        public ReentrantLinkedBlockingQueue(int capacity) {
065            // Allocate more space to prevent starvation dead lock
066            // because a worker can add a new job to the queue.
067            super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity));
068            limitedCapacity = capacity;
069        }
070
071        /**
072         * Block until there are enough remaining capacity to put the entry.
073         */
074        public void limitedPut(T e) throws InterruptedException {
075            limitedPutLock.lockInterruptibly();
076            try {
077                while (remainingCapacity() < limitedCapacity) {
078                    // TODO replace by wakeup when an element is removed
079                    Thread.sleep(100);
080                }
081                put(e);
082            } finally {
083                limitedPutLock.unlock();
084            }
085        }
086
087        @Override
088        public boolean offer(T e) {
089            if (limitedCapacity < 0) {
090                return super.offer(e);
091            }
092            // turn non-blocking offer into a blocking put
093            try {
094                if (Thread.currentThread()
095                        .getName()
096                        .startsWith(WorkManagerImpl.THREAD_PREFIX)) {
097                    // use the full queue capacity for reentrant call
098                    put(e);
099                } else {
100                    // put only if there are enough remaining capacity
101                    limitedPut(e);
102                }
103                return true;
104            } catch (InterruptedException ie) {
105                Thread.currentThread()
106                        .interrupt();
107                throw new RuntimeException("interrupted", ie);
108            }
109        }
110    }
111
112    protected final BlockingQueue<Runnable> queue;
113
114    protected final Map<String, Work> works = new HashMap<>();
115
116    protected final Set<String> scheduledWorks = new HashSet<>();
117
118    protected final Set<String> runningWorks = new HashSet<>();
119
120    long scheduledCount;
121    long runningCount;
122    long completedCount;
123    long cancelledCount;
124
125    /**
126     * Creates a {@link BlockingQueue} with a maximum capacity.
127     * <p>
128     * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
129     *
130     * @param capacity the capacity, or -1 for unbounded
131     */
132    public MemoryBlockingQueue(String id, MemoryWorkQueuing queuing, int capacity) {
133        super(id, queuing);
134        queue = new ReentrantLinkedBlockingQueue<>(capacity);
135    }
136
137    @Override
138    synchronized protected WorkQueueMetrics metrics() {
139        return new WorkQueueMetrics(queueId, scheduledCount, runningCount, completedCount, cancelledCount);
140    }
141
142    @Override
143    public int getQueueSize() {
144        return queue.size();
145    }
146
147    @Override
148    public void putElement(Runnable r) throws InterruptedException {
149        queue.put(r);
150    }
151
152    @Override
153    public Runnable pollElement() {
154        Runnable r = queue.poll();
155        return r;
156    }
157
158    @Override
159    public Runnable take() throws InterruptedException {
160        Runnable r = queue.take();
161        if (anotherWorkIsAlreadyRunning(r)) {
162            // reschedule the work so it does not run concurrently
163            offer(r);
164            // take a break we don't want to take too much CPU looping on the same message.
165            Thread.sleep(100);
166            return null;
167        }
168        return r;
169    }
170
171    private boolean anotherWorkIsAlreadyRunning(Runnable r) throws InterruptedException {
172        Work work = WorkHolder.getWork(r);
173        String id = work.getId();
174        if (runningWorks.contains(id)) {
175            return true;
176        }
177        return false;
178    }
179
180    @Override
181    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
182        long nanos = unit.toNanos(timeout);
183        nanos = awaitActivation(nanos);
184        if (nanos <= 0) {
185            return null;
186        }
187        return queue.poll(nanos, TimeUnit.NANOSECONDS);
188    }
189
190    synchronized WorkQueueMetrics workSchedule(Work work) {
191        String id = work.getId();
192        if (scheduledWorks.contains(id)) {
193            return metrics();
194        }
195        if (!offer(new WorkHolder(work))) {
196            return metrics();
197        }
198        works.put(id, work);
199        scheduledWorks.add(id);
200        scheduledCount += 1;
201        return metrics();
202    }
203
204    synchronized WorkQueueMetrics workRunning(Work work) {
205        String id = work.getId();
206        scheduledWorks.remove(id);
207        works.put(id, work); // update state
208        runningWorks.add(id);
209        scheduledCount -= 1;
210        runningCount += 1;
211        return metrics();
212    }
213
214    synchronized WorkQueueMetrics workCanceled(Work work) {
215        String id = work.getId();
216        for (Iterator<Runnable> it = queue.iterator(); it.hasNext();) {
217            if (id.equals(WorkHolder.getWork(it.next())
218                    .getId())) {
219                it.remove();
220                scheduledWorks.remove(id);
221                works.remove(id);
222                scheduledCount -= 1;
223                cancelledCount +=1 ;
224                break;
225            }
226        }
227        return metrics();
228    }
229
230    synchronized WorkQueueMetrics workCompleted(Work work) {
231        String id = work.getId();
232        if (runningWorks.remove(id) && !scheduledWorks.contains(id)) {
233            works.remove(id);
234        }
235        runningCount -= 1;
236        completedCount += 1;
237        return metrics();
238    }
239
240    synchronized WorkQueueMetrics workRescheduleRunning(Work work) {
241        String id = work.getId();
242        if (!runningWorks.remove(id)) {
243            return metrics();
244        }
245        works.remove(id);
246        runningCount -= 1;
247        return workSchedule(work);
248    }
249
250    synchronized Work lookup(String workId) {
251        return works.get(workId);
252    }
253
254    synchronized List<Work> list() {
255        return new ArrayList<>(works.values());
256    }
257
258    synchronized List<String> keys() {
259        return new ArrayList<>(works.keySet());
260    }
261
262    synchronized List<Work> listScheduled() {
263        return scheduledWorks.stream()
264                .map(works::get)
265                .collect(Collectors.toList());
266    }
267
268    synchronized List<String> scheduledKeys() {
269        return new ArrayList<>(scheduledWorks);
270    }
271
272    synchronized List<Work> listRunning() {
273        return runningWorks.stream()
274                .map(works::get)
275                .collect(Collectors.toList());
276    }
277
278    synchronized List<String> runningKeys() {
279        return new ArrayList<>(runningWorks);
280    }
281
282}