001/*
002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.BlockingQueue;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.locks.ReentrantLock;
033import java.util.stream.Collectors;
034
035import org.nuxeo.ecm.core.work.api.Work;
036import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
037
038/**
039 * Memory-based {@link BlockingQueue}.
040 * <p>
041 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are
042 * {@link WorkHolder}s.
043 */
044public class MemoryBlockingQueue extends NuxeoBlockingQueue {
045
046    /**
047     * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls.
048     */
049    private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> {
050
051        private static final long serialVersionUID = 1L;
052
053        private final ReentrantLock limitedPutLock = new ReentrantLock();
054
055        private final int limitedCapacity;
056
057        /**
058         * Creates a {@link LinkedBlockingQueue} with a maximum capacity.
059         * <p>
060         * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
061         *
062         * @param capacity the capacity, or -1 for unbounded
063         */
064        public ReentrantLinkedBlockingQueue(int capacity) {
065            // Allocate more space to prevent starvation dead lock
066            // because a worker can add a new job to the queue.
067            super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity));
068            limitedCapacity = capacity;
069        }
070
071        /**
072         * Block until there are enough remaining capacity to put the entry.
073         */
074        public void limitedPut(T e) throws InterruptedException {
075            limitedPutLock.lockInterruptibly();
076            try {
077                while (remainingCapacity() < limitedCapacity) {
078                    // TODO replace by wakeup when an element is removed
079                    Thread.sleep(100);
080                }
081                put(e);
082            } finally {
083                limitedPutLock.unlock();
084            }
085        }
086
087        @Override
088        public boolean offer(T e) {
089            if (limitedCapacity < 0) {
090                return super.offer(e);
091            }
092            // turn non-blocking offer into a blocking put
093            try {
094                if (Thread.currentThread()
095                        .getName()
096                        .startsWith(WorkManagerImpl.THREAD_PREFIX)) {
097                    // use the full queue capacity for reentrant call
098                    put(e);
099                } else {
100                    // put only if there are enough remaining capacity
101                    limitedPut(e);
102                }
103                return true;
104            } catch (InterruptedException ie) {
105                Thread.currentThread().interrupt();
106                throw new RuntimeException(ie);
107            }
108        }
109    }
110
111    protected final BlockingQueue<Runnable> queue;
112
113    protected final Map<String, Work> works = new HashMap<>();
114
115    protected final Set<String> scheduledWorks = new HashSet<>();
116
117    protected final Set<String> runningWorks = new HashSet<>();
118
119    long scheduledCount;
120    long runningCount;
121    long completedCount;
122    long cancelledCount;
123
124    /**
125     * Creates a {@link BlockingQueue} with a maximum capacity.
126     * <p>
127     * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
128     *
129     * @param capacity the capacity, or -1 for unbounded
130     */
131    public MemoryBlockingQueue(String id, MemoryWorkQueuing queuing, int capacity) {
132        super(id, queuing);
133        queue = new ReentrantLinkedBlockingQueue<>(capacity);
134    }
135
136    @Override
137    synchronized protected WorkQueueMetrics metrics() {
138        return new WorkQueueMetrics(queueId, scheduledCount, runningCount, completedCount, cancelledCount);
139    }
140
141    @Override
142    public int getQueueSize() {
143        return queue.size();
144    }
145
146    @Override
147    public void putElement(Runnable r) throws InterruptedException {
148        queue.put(r);
149    }
150
151    @Override
152    public Runnable pollElement() {
153        Runnable r = queue.poll();
154        return r;
155    }
156
157    @Override
158    public Runnable take() throws InterruptedException {
159        Runnable r = queue.take();
160        if (anotherWorkIsAlreadyRunning(r)) {
161            // reschedule the work so it does not run concurrently
162            offer(r);
163            // take a break we don't want to take too much CPU looping on the same message.
164            Thread.sleep(100);
165            return null;
166        }
167        return r;
168    }
169
170    private boolean anotherWorkIsAlreadyRunning(Runnable r) throws InterruptedException {
171        Work work = WorkHolder.getWork(r);
172        String id = work.getId();
173        if (runningWorks.contains(id)) {
174            return true;
175        }
176        return false;
177    }
178
179    @Override
180    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
181        long nanos = unit.toNanos(timeout);
182        nanos = awaitActivation(nanos);
183        if (nanos <= 0) {
184            return null;
185        }
186        return queue.poll(nanos, TimeUnit.NANOSECONDS);
187    }
188
189    synchronized WorkQueueMetrics workSchedule(Work work) {
190        String id = work.getId();
191        if (scheduledWorks.contains(id)) {
192            return metrics();
193        }
194        if (!offer(new WorkHolder(work))) {
195            return metrics();
196        }
197        works.put(id, work);
198        scheduledWorks.add(id);
199        scheduledCount += 1;
200        return metrics();
201    }
202
203    synchronized WorkQueueMetrics workRunning(Work work) {
204        String id = work.getId();
205        scheduledWorks.remove(id);
206        works.put(id, work); // update state
207        runningWorks.add(id);
208        scheduledCount -= 1;
209        runningCount += 1;
210        return metrics();
211    }
212
213    synchronized WorkQueueMetrics workCanceled(Work work) {
214        String id = work.getId();
215        for (Iterator<Runnable> it = queue.iterator(); it.hasNext();) {
216            if (id.equals(WorkHolder.getWork(it.next())
217                    .getId())) {
218                it.remove();
219                scheduledWorks.remove(id);
220                works.remove(id);
221                scheduledCount -= 1;
222                cancelledCount +=1 ;
223                break;
224            }
225        }
226        return metrics();
227    }
228
229    synchronized WorkQueueMetrics workCompleted(Work work) {
230        String id = work.getId();
231        if (runningWorks.remove(id) && !scheduledWorks.contains(id)) {
232            works.remove(id);
233        }
234        runningCount -= 1;
235        completedCount += 1;
236        return metrics();
237    }
238
239    synchronized WorkQueueMetrics workRescheduleRunning(Work work) {
240        String id = work.getId();
241        if (!runningWorks.remove(id)) {
242            return metrics();
243        }
244        works.remove(id);
245        runningCount -= 1;
246        return workSchedule(work);
247    }
248
249    synchronized Work lookup(String workId) {
250        return works.get(workId);
251    }
252
253    synchronized List<Work> list() {
254        return new ArrayList<>(works.values());
255    }
256
257    synchronized List<String> keys() {
258        return new ArrayList<>(works.keySet());
259    }
260
261    synchronized List<Work> listScheduled() {
262        return scheduledWorks.stream()
263                .map(works::get)
264                .collect(Collectors.toList());
265    }
266
267    synchronized List<String> scheduledKeys() {
268        return new ArrayList<>(scheduledWorks);
269    }
270
271    synchronized List<Work> listRunning() {
272        return runningWorks.stream()
273                .map(works::get)
274                .collect(Collectors.toList());
275    }
276
277    synchronized List<String> runningKeys() {
278        return new ArrayList<>(runningWorks);
279    }
280
281}