001/*
002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Benoit Delbosc
016 *     Florent Guillaume
017 */
018package org.nuxeo.ecm.core.work;
019
020import java.util.HashSet;
021import java.util.Iterator;
022import java.util.Set;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.ReentrantLock;
027
028import org.nuxeo.ecm.core.work.api.Work;
029
030/**
031 * Memory-based {@link BlockingQueue}.
032 * <p>
033 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are
034 * {@link WorkHolder}s.
035 */
036public class MemoryBlockingQueue extends NuxeoBlockingQueue {
037
038    /**
039     * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls.
040     */
041    private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> {
042
043        private static final long serialVersionUID = 1L;
044
045        private final ReentrantLock limitedPutLock = new ReentrantLock();
046
047        private final int limitedCapacity;
048
049        /**
050         * Creates a {@link LinkedBlockingQueue} with a maximum capacity.
051         * <p>
052         * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
053         *
054         * @param capacity the capacity, or -1 for unbounded
055         */
056        public ReentrantLinkedBlockingQueue(int capacity) {
057            // Allocate more space to prevent starvation dead lock
058            // because a worker can add a new job to the queue.
059            super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity));
060            limitedCapacity = capacity;
061        }
062
063        /**
064         * Block until there are enough remaining capacity to put the entry.
065         */
066        public void limitedPut(T e) throws InterruptedException {
067            limitedPutLock.lockInterruptibly();
068            try {
069                while (remainingCapacity() < limitedCapacity) {
070                    // TODO replace by wakeup when an element is removed
071                    Thread.sleep(100);
072                }
073                put(e);
074            } finally {
075                limitedPutLock.unlock();
076            }
077        }
078
079        @Override
080        public boolean offer(T e) {
081            if (limitedCapacity < 0) {
082                return super.offer(e);
083            }
084            // turn non-blocking offer into a blocking put
085            try {
086                if (Thread.currentThread().getName().startsWith(WorkManagerImpl.THREAD_PREFIX)) {
087                    // use the full queue capacity for reentrant call
088                    put(e);
089                } else {
090                    // put only if there are enough remaining capacity
091                    limitedPut(e);
092                }
093                return true;
094            } catch (InterruptedException ie) {
095                Thread.currentThread().interrupt();
096                throw new RuntimeException("interrupted", ie);
097            }
098        }
099    }
100
101    protected final MemoryWorkQueuing queuing;
102
103    protected final BlockingQueue<Runnable> queue;
104
105    // @GuardedBy("itself")
106    protected final Set<String> workIds;
107
108    /**
109     * Creates a {@link BlockingQueue} with a maximum capacity.
110     * <p>
111     * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
112     *
113     * @param capacity the capacity, or -1 for unbounded
114     */
115    public MemoryBlockingQueue(MemoryWorkQueuing queuing, int capacity) {
116        this.queuing = queuing;
117        queue = new ReentrantLinkedBlockingQueue<Runnable>(capacity);
118        workIds = new HashSet<String>();
119    }
120
121    /**
122     * Checks if the queue contains a given work id.
123     *
124     * @param workId the work id
125     * @return {@code true} if the queue contains the work id
126     */
127    public boolean containsWorkId(String workId) {
128        synchronized (workIds) {
129            return workIds.contains(workId);
130        }
131    }
132
133    private Runnable addWorkId(Runnable r) {
134        if (r instanceof WorkHolder) {
135            WorkHolder wh = (WorkHolder) r;
136            String id = WorkHolder.getWork(wh).getId();
137            synchronized (workIds) {
138                workIds.add(id);
139            }
140        }
141        return r;
142    }
143
144    private Runnable removeWorkId(Runnable r) {
145        if (r instanceof WorkHolder) {
146            WorkHolder wh = (WorkHolder) r;
147            String id = WorkHolder.getWork(wh).getId();
148            synchronized (workIds) {
149                workIds.remove(id);
150            }
151        }
152        return r;
153    }
154
155    @Override
156    public int getQueueSize() {
157        return queue.size();
158    }
159
160    @Override
161    public void putElement(Runnable r) throws InterruptedException {
162        queue.put(r);
163        addWorkId(r);
164    }
165
166    @Override
167    public Runnable pollElement() {
168        Runnable r = queue.poll();
169        removeWorkId(r);
170        return r;
171    }
172
173    @Override
174    public Runnable take() throws InterruptedException {
175        Runnable r = queue.take();
176        removeWorkId(r);
177        return r;
178    }
179
180    /*
181     * We can implement iterator, super doesn't have it.
182     */
183    @Override
184    public Iterator<Runnable> iterator() {
185        return new Itr(queue.iterator());
186    }
187
188    private class Itr implements Iterator<Runnable> {
189        private Iterator<Runnable> it;
190
191        private Runnable last;
192
193        public Itr(Iterator<Runnable> it) {
194            this.it = it;
195        }
196
197        @Override
198        public boolean hasNext() {
199            return it.hasNext();
200        }
201
202        @Override
203        public Runnable next() {
204            return last = it.next();
205        }
206
207        @Override
208        public void remove() {
209            it.remove();
210            removeWorkId(last);
211        }
212    }
213
214    @Override
215    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
216        long nanos = unit.toNanos(timeout);
217        nanos = awaitActivation(nanos);
218        if (nanos <= 0) {
219            return null;
220        }
221        return queue.poll(nanos, TimeUnit.NANOSECONDS);
222    }
223
224}