001/* 002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.BlockingQueue; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.locks.ReentrantLock; 033import java.util.stream.Collectors; 034 035import org.nuxeo.ecm.core.work.api.Work; 036import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 037 038/** 039 * Memory-based {@link BlockingQueue}. 040 * <p> 041 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are 042 * {@link WorkHolder}s. 043 */ 044public class MemoryBlockingQueue extends NuxeoBlockingQueue { 045 046 /** 047 * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls. 048 */ 049 private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> { 050 051 private static final long serialVersionUID = 1L; 052 053 private final ReentrantLock limitedPutLock = new ReentrantLock(); 054 055 private final int limitedCapacity; 056 057 /** 058 * Creates a {@link LinkedBlockingQueue} with a maximum capacity. 059 * <p> 060 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 061 * 062 * @param capacity the capacity, or -1 for unbounded 063 */ 064 public ReentrantLinkedBlockingQueue(int capacity) { 065 // Allocate more space to prevent starvation dead lock 066 // because a worker can add a new job to the queue. 067 super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity)); 068 limitedCapacity = capacity; 069 } 070 071 /** 072 * Block until there are enough remaining capacity to put the entry. 073 */ 074 public void limitedPut(T e) throws InterruptedException { 075 limitedPutLock.lockInterruptibly(); 076 try { 077 while (remainingCapacity() < limitedCapacity) { 078 // TODO replace by wakeup when an element is removed 079 Thread.sleep(100); 080 } 081 put(e); 082 } finally { 083 limitedPutLock.unlock(); 084 } 085 } 086 087 @Override 088 public boolean offer(T e) { 089 if (limitedCapacity < 0) { 090 return super.offer(e); 091 } 092 // turn non-blocking offer into a blocking put 093 try { 094 if (Thread.currentThread() 095 .getName() 096 .startsWith(WorkManagerImpl.THREAD_PREFIX)) { 097 // use the full queue capacity for reentrant call 098 put(e); 099 } else { 100 // put only if there are enough remaining capacity 101 limitedPut(e); 102 } 103 return true; 104 } catch (InterruptedException ie) { 105 Thread.currentThread() 106 .interrupt(); 107 throw new RuntimeException("interrupted", ie); 108 } 109 } 110 } 111 112 protected final BlockingQueue<Runnable> queue; 113 114 protected final Map<String, Work> works = new HashMap<>(); 115 116 protected final Set<String> scheduledWorks = new HashSet<>(); 117 118 protected final Set<String> runningWorks = new HashSet<>(); 119 120 long scheduledCount; 121 long runningCount; 122 long completedCount; 123 long cancelledCount; 124 125 /** 126 * Creates a {@link BlockingQueue} with a maximum capacity. 127 * <p> 128 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 129 * 130 * @param capacity the capacity, or -1 for unbounded 131 */ 132 public MemoryBlockingQueue(String id, MemoryWorkQueuing queuing, int capacity) { 133 super(id, queuing); 134 queue = new ReentrantLinkedBlockingQueue<>(capacity); 135 } 136 137 @Override 138 synchronized protected WorkQueueMetrics metrics() { 139 return new WorkQueueMetrics(queueId, scheduledCount, runningCount, completedCount, cancelledCount); 140 } 141 142 @Override 143 public int getQueueSize() { 144 return queue.size(); 145 } 146 147 @Override 148 public void putElement(Runnable r) throws InterruptedException { 149 queue.put(r); 150 } 151 152 @Override 153 public Runnable pollElement() { 154 Runnable r = queue.poll(); 155 return r; 156 } 157 158 @Override 159 public Runnable take() throws InterruptedException { 160 Runnable r = queue.take(); 161 if (anotherWorkIsAlreadyRunning(r)) { 162 // reschedule the work so it does not run concurrently 163 offer(r); 164 // take a break we don't want to take too much CPU looping on the same message. 165 Thread.sleep(100); 166 return null; 167 } 168 return r; 169 } 170 171 private boolean anotherWorkIsAlreadyRunning(Runnable r) throws InterruptedException { 172 Work work = WorkHolder.getWork(r); 173 String id = work.getId(); 174 if (runningWorks.contains(id)) { 175 return true; 176 } 177 return false; 178 } 179 180 @Override 181 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 182 long nanos = unit.toNanos(timeout); 183 nanos = awaitActivation(nanos); 184 if (nanos <= 0) { 185 return null; 186 } 187 return queue.poll(nanos, TimeUnit.NANOSECONDS); 188 } 189 190 synchronized WorkQueueMetrics workSchedule(Work work) { 191 String id = work.getId(); 192 if (scheduledWorks.contains(id)) { 193 return metrics(); 194 } 195 if (!offer(new WorkHolder(work))) { 196 return metrics(); 197 } 198 works.put(id, work); 199 scheduledWorks.add(id); 200 scheduledCount += 1; 201 return metrics(); 202 } 203 204 synchronized WorkQueueMetrics workRunning(Work work) { 205 String id = work.getId(); 206 scheduledWorks.remove(id); 207 works.put(id, work); // update state 208 runningWorks.add(id); 209 scheduledCount -= 1; 210 runningCount += 1; 211 return metrics(); 212 } 213 214 synchronized WorkQueueMetrics workCanceled(Work work) { 215 String id = work.getId(); 216 for (Iterator<Runnable> it = queue.iterator(); it.hasNext();) { 217 if (id.equals(WorkHolder.getWork(it.next()) 218 .getId())) { 219 it.remove(); 220 scheduledWorks.remove(id); 221 works.remove(id); 222 scheduledCount -= 1; 223 cancelledCount +=1 ; 224 break; 225 } 226 } 227 return metrics(); 228 } 229 230 synchronized WorkQueueMetrics workCompleted(Work work) { 231 String id = work.getId(); 232 if (runningWorks.remove(id) && !scheduledWorks.contains(id)) { 233 works.remove(id); 234 } 235 runningCount -= 1; 236 completedCount += 1; 237 return metrics(); 238 } 239 240 synchronized WorkQueueMetrics workRescheduleRunning(Work work) { 241 String id = work.getId(); 242 if (!runningWorks.remove(id)) { 243 return metrics(); 244 } 245 works.remove(id); 246 runningCount -= 1; 247 return workSchedule(work); 248 } 249 250 synchronized Work lookup(String workId) { 251 return works.get(workId); 252 } 253 254 synchronized List<Work> list() { 255 return new ArrayList<>(works.values()); 256 } 257 258 synchronized List<String> keys() { 259 return new ArrayList<>(works.keySet()); 260 } 261 262 synchronized List<Work> listScheduled() { 263 return scheduledWorks.stream() 264 .map(works::get) 265 .collect(Collectors.toList()); 266 } 267 268 synchronized List<String> scheduledKeys() { 269 return new ArrayList<>(scheduledWorks); 270 } 271 272 synchronized List<Work> listRunning() { 273 return runningWorks.stream() 274 .map(works::get) 275 .collect(Collectors.toList()); 276 } 277 278 synchronized List<String> runningKeys() { 279 return new ArrayList<>(runningWorks); 280 } 281 282}