001/*
002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Set;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.locks.ReentrantLock;
029
030import org.nuxeo.ecm.core.work.api.Work;
031
032/**
033 * Memory-based {@link BlockingQueue}.
034 * <p>
035 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are
036 * {@link WorkHolder}s.
037 */
038public class MemoryBlockingQueue extends NuxeoBlockingQueue {
039
040    /**
041     * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls.
042     */
043    private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> {
044
045        private static final long serialVersionUID = 1L;
046
047        private final ReentrantLock limitedPutLock = new ReentrantLock();
048
049        private final int limitedCapacity;
050
051        /**
052         * Creates a {@link LinkedBlockingQueue} with a maximum capacity.
053         * <p>
054         * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
055         *
056         * @param capacity the capacity, or -1 for unbounded
057         */
058        public ReentrantLinkedBlockingQueue(int capacity) {
059            // Allocate more space to prevent starvation dead lock
060            // because a worker can add a new job to the queue.
061            super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity));
062            limitedCapacity = capacity;
063        }
064
065        /**
066         * Block until there are enough remaining capacity to put the entry.
067         */
068        public void limitedPut(T e) throws InterruptedException {
069            limitedPutLock.lockInterruptibly();
070            try {
071                while (remainingCapacity() < limitedCapacity) {
072                    // TODO replace by wakeup when an element is removed
073                    Thread.sleep(100);
074                }
075                put(e);
076            } finally {
077                limitedPutLock.unlock();
078            }
079        }
080
081        @Override
082        public boolean offer(T e) {
083            if (limitedCapacity < 0) {
084                return super.offer(e);
085            }
086            // turn non-blocking offer into a blocking put
087            try {
088                if (Thread.currentThread().getName().startsWith(WorkManagerImpl.THREAD_PREFIX)) {
089                    // use the full queue capacity for reentrant call
090                    put(e);
091                } else {
092                    // put only if there are enough remaining capacity
093                    limitedPut(e);
094                }
095                return true;
096            } catch (InterruptedException ie) {
097                Thread.currentThread().interrupt();
098                throw new RuntimeException("interrupted", ie);
099            }
100        }
101    }
102
103    protected final BlockingQueue<Runnable> queue;
104
105    // @GuardedBy("itself")
106    protected final Set<String> workIds;
107
108    /**
109     * Creates a {@link BlockingQueue} with a maximum capacity.
110     * <p>
111     * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}.
112     *
113     * @param capacity the capacity, or -1 for unbounded
114     */
115    public MemoryBlockingQueue(int capacity) {
116        queue = new ReentrantLinkedBlockingQueue<>(capacity);
117        workIds = new HashSet<>();
118    }
119
120    /**
121     * Checks if the queue contains a given work id.
122     *
123     * @param workId the work id
124     * @return {@code true} if the queue contains the work id
125     */
126    public boolean containsWorkId(String workId) {
127        synchronized (workIds) {
128            return workIds.contains(workId);
129        }
130    }
131
132    private Runnable addWorkId(Runnable r) {
133        if (r instanceof WorkHolder) {
134            WorkHolder wh = (WorkHolder) r;
135            String id = WorkHolder.getWork(wh).getId();
136            synchronized (workIds) {
137                workIds.add(id);
138            }
139        }
140        return r;
141    }
142
143    private Runnable removeWorkId(Runnable r) {
144        if (r instanceof WorkHolder) {
145            WorkHolder wh = (WorkHolder) r;
146            String id = WorkHolder.getWork(wh).getId();
147            synchronized (workIds) {
148                workIds.remove(id);
149            }
150        }
151        return r;
152    }
153
154    @Override
155    public int getQueueSize() {
156        return queue.size();
157    }
158
159    @Override
160    public void putElement(Runnable r) throws InterruptedException {
161        queue.put(r);
162        addWorkId(r);
163    }
164
165    @Override
166    public Runnable pollElement() {
167        Runnable r = queue.poll();
168        removeWorkId(r);
169        return r;
170    }
171
172    @Override
173    public Runnable take() throws InterruptedException {
174        Runnable r = queue.take();
175        removeWorkId(r);
176        return r;
177    }
178
179    /*
180     * We can implement iterator, super doesn't have it.
181     */
182    @Override
183    public Iterator<Runnable> iterator() {
184        return new Itr(queue.iterator());
185    }
186
187    private class Itr implements Iterator<Runnable> {
188        private Iterator<Runnable> it;
189
190        private Runnable last;
191
192        public Itr(Iterator<Runnable> it) {
193            this.it = it;
194        }
195
196        @Override
197        public boolean hasNext() {
198            return it.hasNext();
199        }
200
201        @Override
202        public Runnable next() {
203            return last = it.next();
204        }
205
206        @Override
207        public void remove() {
208            it.remove();
209            removeWorkId(last);
210        }
211    }
212
213    @Override
214    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
215        long nanos = unit.toNanos(timeout);
216        nanos = awaitActivation(nanos);
217        if (nanos <= 0) {
218            return null;
219        }
220        return queue.poll(nanos, TimeUnit.NANOSECONDS);
221    }
222
223}