001/*
002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.BlockingQueue;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.locks.ReentrantLock;
033import java.util.stream.Collectors;
034
035import org.nuxeo.ecm.core.work.api.Work;
036import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
037
038/**
039 * Memory-based {@link BlockingQueue}.
040 * <p>
041 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are
042 * {@link WorkHolder}s.
043 */
044public class MemoryBlockingQueue extends NuxeoBlockingQueue {
045
046    /**
047     * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls.
048     */
049    private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> {
050
051        private static final long serialVersionUID = 1L;
052
053        private final ReentrantLock limitedPutLock = new ReentrantLock();
054
055        private final int limitedCapacity;
056
057        /**
058         * Creates a {@link LinkedBlockingQueue} with a maximum capacity.
059         * <p>
060         * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
061         *
062         * @param capacity the capacity, or -1 for unbounded
063         */
064        public ReentrantLinkedBlockingQueue(int capacity) {
065            // Allocate more space to prevent starvation dead lock
066            // because a worker can add a new job to the queue.
067            super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity));
068            limitedCapacity = capacity;
069        }
070
071        /**
072         * Block until there are enough remaining capacity to put the entry.
073         */
074        public void limitedPut(T e) throws InterruptedException {
075            limitedPutLock.lockInterruptibly();
076            try {
077                while (remainingCapacity() < limitedCapacity) {
078                    // TODO replace by wakeup when an element is removed
079                    Thread.sleep(100);
080                }
081                put(e);
082            } finally {
083                limitedPutLock.unlock();
084            }
085        }
086
087        @Override
088        public boolean offer(T e) {
089            if (limitedCapacity < 0) {
090                return super.offer(e);
091            }
092            // turn non-blocking offer into a blocking put
093            try {
094                if (Thread.currentThread()
095                        .getName()
096                        .startsWith(WorkManagerImpl.THREAD_PREFIX)) {
097                    // use the full queue capacity for reentrant call
098                    put(e);
099                } else {
100                    // put only if there are enough remaining capacity
101                    limitedPut(e);
102                }
103                return true;
104            } catch (InterruptedException ie) {
105                Thread.currentThread()
106                        .interrupt();
107                throw new RuntimeException("interrupted", ie);
108            }
109        }
110    }
111
112    protected final BlockingQueue<Runnable> queue;
113
114    protected final Map<String, Work> works = new HashMap<>();
115
116    protected final Set<String> scheduledWorks = new HashSet<>();
117
118    protected final Set<String> runningWorks = new HashSet<>();
119
120    long scheduledCount;
121    long runningCount;
122    long completedCount;
123    long cancelledCount;
124
125    /**
126     * Creates a {@link BlockingQueue} with a maximum capacity.
127     * <p>
128     * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
129     *
130     * @param capacity the capacity, or -1 for unbounded
131     */
132    public MemoryBlockingQueue(String id, MemoryWorkQueuing queuing, int capacity) {
133        super(id, queuing);
134        queue = new ReentrantLinkedBlockingQueue<>(capacity);
135    }
136
137    @Override
138    synchronized protected WorkQueueMetrics metrics() {
139        return new WorkQueueMetrics(queueId, scheduledCount, runningCount, completedCount, cancelledCount);
140    }
141
142    @Override
143    public int getQueueSize() {
144        return queue.size();
145    }
146
147    @Override
148    public void putElement(Runnable r) throws InterruptedException {
149        queue.put(r);
150    }
151
152    @Override
153    public Runnable pollElement() {
154        Runnable r = queue.poll();
155        return r;
156    }
157
158    @Override
159    public Runnable take() throws InterruptedException {
160        Runnable r = queue.take();
161        return r;
162    }
163
164    @Override
165    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
166        long nanos = unit.toNanos(timeout);
167        nanos = awaitActivation(nanos);
168        if (nanos <= 0) {
169            return null;
170        }
171        return queue.poll(nanos, TimeUnit.NANOSECONDS);
172    }
173
174    synchronized WorkQueueMetrics workSchedule(Work work) {
175        String id = work.getId();
176        if (scheduledWorks.contains(id)) {
177            return metrics();
178        }
179        if (!offer(new WorkHolder(work))) {
180            return metrics();
181        }
182        works.put(id, work);
183        scheduledWorks.add(id);
184        scheduledCount += 1;
185        return metrics();
186    }
187
188    synchronized WorkQueueMetrics workRunning(Work work) {
189        String id = work.getId();
190        scheduledWorks.remove(id);
191        works.put(id, work); // update state
192        runningWorks.add(id);
193        scheduledCount -= 1;
194        runningCount += 1;
195        return metrics();
196    }
197
198    synchronized WorkQueueMetrics workCanceled(Work work) {
199        String id = work.getId();
200        for (Iterator<Runnable> it = queue.iterator(); it.hasNext();) {
201            if (id.equals(WorkHolder.getWork(it.next())
202                    .getId())) {
203                it.remove();
204                scheduledWorks.remove(id);
205                works.remove(id);
206                scheduledCount -= 1;
207                cancelledCount +=1 ;
208                break;
209            }
210        }
211        return metrics();
212    }
213
214    synchronized WorkQueueMetrics workCompleted(Work work) {
215        String id = work.getId();
216        if (runningWorks.remove(id) && !scheduledWorks.contains(id)) {
217            works.remove(id);
218        }
219        runningCount -= 1;
220        completedCount += 1;
221        return metrics();
222    }
223
224    synchronized WorkQueueMetrics workRescheduleRunning(Work work) {
225        String id = work.getId();
226        if (!runningWorks.remove(id)) {
227            return metrics();
228        }
229        works.remove(id);
230        runningCount -= 1;
231        return workSchedule(work);
232    }
233
234    synchronized Work lookup(String workId) {
235        return works.get(workId);
236    }
237
238    synchronized List<Work> list() {
239        return new ArrayList<>(works.values());
240    }
241
242    synchronized List<String> keys() {
243        return new ArrayList<>(works.keySet());
244    }
245
246    synchronized List<Work> listScheduled() {
247        return scheduledWorks.stream()
248                .map(works::get)
249                .collect(Collectors.toList());
250    }
251
252    synchronized List<String> scheduledKeys() {
253        return new ArrayList<>(scheduledWorks);
254    }
255
256    synchronized List<Work> listRunning() {
257        return runningWorks.stream()
258                .map(works::get)
259                .collect(Collectors.toList());
260    }
261
262    synchronized List<String> runningKeys() {
263        return new ArrayList<>(runningWorks);
264    }
265
266}