001/* 002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Benoit Delbosc 016 * Florent Guillaume 017 */ 018package org.nuxeo.ecm.core.work; 019 020import java.util.HashSet; 021import java.util.Iterator; 022import java.util.Set; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.locks.ReentrantLock; 027 028import org.nuxeo.ecm.core.work.api.Work; 029 030/** 031 * Memory-based {@link BlockingQueue}. 032 * <p> 033 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are 034 * {@link WorkHolder}s. 035 */ 036public class MemoryBlockingQueue extends NuxeoBlockingQueue { 037 038 /** 039 * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls. 040 */ 041 private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> { 042 043 private static final long serialVersionUID = 1L; 044 045 private final ReentrantLock limitedPutLock = new ReentrantLock(); 046 047 private final int limitedCapacity; 048 049 /** 050 * Creates a {@link LinkedBlockingQueue} with a maximum capacity. 051 * <p> 052 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 053 * 054 * @param capacity the capacity, or -1 for unbounded 055 */ 056 public ReentrantLinkedBlockingQueue(int capacity) { 057 // Allocate more space to prevent starvation dead lock 058 // because a worker can add a new job to the queue. 059 super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity)); 060 limitedCapacity = capacity; 061 } 062 063 /** 064 * Block until there are enough remaining capacity to put the entry. 065 */ 066 public void limitedPut(T e) throws InterruptedException { 067 limitedPutLock.lockInterruptibly(); 068 try { 069 while (remainingCapacity() < limitedCapacity) { 070 // TODO replace by wakeup when an element is removed 071 Thread.sleep(100); 072 } 073 put(e); 074 } finally { 075 limitedPutLock.unlock(); 076 } 077 } 078 079 @Override 080 public boolean offer(T e) { 081 if (limitedCapacity < 0) { 082 return super.offer(e); 083 } 084 // turn non-blocking offer into a blocking put 085 try { 086 if (Thread.currentThread().getName().startsWith(WorkManagerImpl.THREAD_PREFIX)) { 087 // use the full queue capacity for reentrant call 088 put(e); 089 } else { 090 // put only if there are enough remaining capacity 091 limitedPut(e); 092 } 093 return true; 094 } catch (InterruptedException ie) { 095 Thread.currentThread().interrupt(); 096 throw new RuntimeException("interrupted", ie); 097 } 098 } 099 } 100 101 protected final MemoryWorkQueuing queuing; 102 103 protected final BlockingQueue<Runnable> queue; 104 105 // @GuardedBy("itself") 106 protected final Set<String> workIds; 107 108 /** 109 * Creates a {@link BlockingQueue} with a maximum capacity. 110 * <p> 111 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 112 * 113 * @param capacity the capacity, or -1 for unbounded 114 */ 115 public MemoryBlockingQueue(MemoryWorkQueuing queuing, int capacity) { 116 this.queuing = queuing; 117 queue = new ReentrantLinkedBlockingQueue<Runnable>(capacity); 118 workIds = new HashSet<String>(); 119 } 120 121 /** 122 * Checks if the queue contains a given work id. 123 * 124 * @param workId the work id 125 * @return {@code true} if the queue contains the work id 126 */ 127 public boolean containsWorkId(String workId) { 128 synchronized (workIds) { 129 return workIds.contains(workId); 130 } 131 } 132 133 private Runnable addWorkId(Runnable r) { 134 if (r instanceof WorkHolder) { 135 WorkHolder wh = (WorkHolder) r; 136 String id = WorkHolder.getWork(wh).getId(); 137 synchronized (workIds) { 138 workIds.add(id); 139 } 140 } 141 return r; 142 } 143 144 private Runnable removeWorkId(Runnable r) { 145 if (r instanceof WorkHolder) { 146 WorkHolder wh = (WorkHolder) r; 147 String id = WorkHolder.getWork(wh).getId(); 148 synchronized (workIds) { 149 workIds.remove(id); 150 } 151 } 152 return r; 153 } 154 155 @Override 156 public int getQueueSize() { 157 return queue.size(); 158 } 159 160 @Override 161 public void putElement(Runnable r) throws InterruptedException { 162 queue.put(r); 163 addWorkId(r); 164 } 165 166 @Override 167 public Runnable pollElement() { 168 Runnable r = queue.poll(); 169 removeWorkId(r); 170 return r; 171 } 172 173 @Override 174 public Runnable take() throws InterruptedException { 175 Runnable r = queue.take(); 176 removeWorkId(r); 177 return r; 178 } 179 180 /* 181 * We can implement iterator, super doesn't have it. 182 */ 183 @Override 184 public Iterator<Runnable> iterator() { 185 return new Itr(queue.iterator()); 186 } 187 188 private class Itr implements Iterator<Runnable> { 189 private Iterator<Runnable> it; 190 191 private Runnable last; 192 193 public Itr(Iterator<Runnable> it) { 194 this.it = it; 195 } 196 197 @Override 198 public boolean hasNext() { 199 return it.hasNext(); 200 } 201 202 @Override 203 public Runnable next() { 204 return last = it.next(); 205 } 206 207 @Override 208 public void remove() { 209 it.remove(); 210 removeWorkId(last); 211 } 212 } 213 214 @Override 215 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 216 long nanos = unit.toNanos(timeout); 217 nanos = awaitActivation(nanos); 218 if (nanos <= 0) { 219 return null; 220 } 221 return queue.poll(nanos, TimeUnit.NANOSECONDS); 222 } 223 224}