001/* 002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.BlockingQueue; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.locks.ReentrantLock; 033import java.util.stream.Collectors; 034 035import org.nuxeo.ecm.core.work.api.Work; 036import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 037 038/** 039 * Memory-based {@link BlockingQueue}. 040 * <p> 041 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are 042 * {@link WorkHolder}s. 043 */ 044public class MemoryBlockingQueue extends NuxeoBlockingQueue { 045 046 /** 047 * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls. 048 */ 049 private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> { 050 051 private static final long serialVersionUID = 1L; 052 053 private final ReentrantLock limitedPutLock = new ReentrantLock(); 054 055 private final int limitedCapacity; 056 057 /** 058 * Creates a {@link LinkedBlockingQueue} with a maximum capacity. 059 * <p> 060 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 061 * 062 * @param capacity the capacity, or -1 for unbounded 063 */ 064 public ReentrantLinkedBlockingQueue(int capacity) { 065 // Allocate more space to prevent starvation dead lock 066 // because a worker can add a new job to the queue. 067 super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity)); 068 limitedCapacity = capacity; 069 } 070 071 /** 072 * Block until there are enough remaining capacity to put the entry. 073 */ 074 public void limitedPut(T e) throws InterruptedException { 075 limitedPutLock.lockInterruptibly(); 076 try { 077 while (remainingCapacity() < limitedCapacity) { 078 // TODO replace by wakeup when an element is removed 079 Thread.sleep(100); 080 } 081 put(e); 082 } finally { 083 limitedPutLock.unlock(); 084 } 085 } 086 087 @Override 088 public boolean offer(T e) { 089 if (limitedCapacity < 0) { 090 return super.offer(e); 091 } 092 // turn non-blocking offer into a blocking put 093 try { 094 if (Thread.currentThread() 095 .getName() 096 .startsWith(WorkManagerImpl.THREAD_PREFIX)) { 097 // use the full queue capacity for reentrant call 098 put(e); 099 } else { 100 // put only if there are enough remaining capacity 101 limitedPut(e); 102 } 103 return true; 104 } catch (InterruptedException ie) { 105 Thread.currentThread().interrupt(); 106 throw new RuntimeException(ie); 107 } 108 } 109 } 110 111 protected final BlockingQueue<Runnable> queue; 112 113 protected final Map<String, Work> works = new HashMap<>(); 114 115 protected final Set<String> scheduledWorks = new HashSet<>(); 116 117 protected final Set<String> runningWorks = new HashSet<>(); 118 119 long scheduledCount; 120 long runningCount; 121 long completedCount; 122 long cancelledCount; 123 124 /** 125 * Creates a {@link BlockingQueue} with a maximum capacity. 126 * <p> 127 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 128 * 129 * @param capacity the capacity, or -1 for unbounded 130 */ 131 public MemoryBlockingQueue(String id, MemoryWorkQueuing queuing, int capacity) { 132 super(id, queuing); 133 queue = new ReentrantLinkedBlockingQueue<>(capacity); 134 } 135 136 @Override 137 synchronized protected WorkQueueMetrics metrics() { 138 return new WorkQueueMetrics(queueId, scheduledCount, runningCount, completedCount, cancelledCount); 139 } 140 141 @Override 142 public int getQueueSize() { 143 return queue.size(); 144 } 145 146 @Override 147 public void putElement(Runnable r) throws InterruptedException { 148 queue.put(r); 149 } 150 151 @Override 152 public Runnable pollElement() { 153 Runnable r = queue.poll(); 154 return r; 155 } 156 157 @Override 158 public Runnable take() throws InterruptedException { 159 Runnable r = queue.take(); 160 if (anotherWorkIsAlreadyRunning(r)) { 161 // reschedule the work so it does not run concurrently 162 offer(r); 163 // take a break we don't want to take too much CPU looping on the same message. 164 Thread.sleep(100); 165 return null; 166 } 167 return r; 168 } 169 170 private boolean anotherWorkIsAlreadyRunning(Runnable r) throws InterruptedException { 171 Work work = WorkHolder.getWork(r); 172 String id = work.getId(); 173 if (runningWorks.contains(id)) { 174 return true; 175 } 176 return false; 177 } 178 179 @Override 180 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 181 long nanos = unit.toNanos(timeout); 182 nanos = awaitActivation(nanos); 183 if (nanos <= 0) { 184 return null; 185 } 186 return queue.poll(nanos, TimeUnit.NANOSECONDS); 187 } 188 189 synchronized WorkQueueMetrics workSchedule(Work work) { 190 String id = work.getId(); 191 if (scheduledWorks.contains(id)) { 192 return metrics(); 193 } 194 if (!offer(new WorkHolder(work))) { 195 return metrics(); 196 } 197 works.put(id, work); 198 scheduledWorks.add(id); 199 scheduledCount += 1; 200 return metrics(); 201 } 202 203 synchronized WorkQueueMetrics workRunning(Work work) { 204 String id = work.getId(); 205 scheduledWorks.remove(id); 206 works.put(id, work); // update state 207 runningWorks.add(id); 208 scheduledCount -= 1; 209 runningCount += 1; 210 return metrics(); 211 } 212 213 synchronized WorkQueueMetrics workCanceled(Work work) { 214 String id = work.getId(); 215 for (Iterator<Runnable> it = queue.iterator(); it.hasNext();) { 216 if (id.equals(WorkHolder.getWork(it.next()) 217 .getId())) { 218 it.remove(); 219 scheduledWorks.remove(id); 220 works.remove(id); 221 scheduledCount -= 1; 222 cancelledCount +=1 ; 223 break; 224 } 225 } 226 return metrics(); 227 } 228 229 synchronized WorkQueueMetrics workCompleted(Work work) { 230 String id = work.getId(); 231 if (runningWorks.remove(id) && !scheduledWorks.contains(id)) { 232 works.remove(id); 233 } 234 runningCount -= 1; 235 completedCount += 1; 236 return metrics(); 237 } 238 239 synchronized WorkQueueMetrics workRescheduleRunning(Work work) { 240 String id = work.getId(); 241 if (!runningWorks.remove(id)) { 242 return metrics(); 243 } 244 works.remove(id); 245 runningCount -= 1; 246 return workSchedule(work); 247 } 248 249 synchronized Work lookup(String workId) { 250 return works.get(workId); 251 } 252 253 synchronized List<Work> list() { 254 return new ArrayList<>(works.values()); 255 } 256 257 synchronized List<String> keys() { 258 return new ArrayList<>(works.keySet()); 259 } 260 261 synchronized List<Work> listScheduled() { 262 return scheduledWorks.stream() 263 .map(works::get) 264 .collect(Collectors.toList()); 265 } 266 267 synchronized List<String> scheduledKeys() { 268 return new ArrayList<>(scheduledWorks); 269 } 270 271 synchronized List<Work> listRunning() { 272 return runningWorks.stream() 273 .map(works::get) 274 .collect(Collectors.toList()); 275 } 276 277 synchronized List<String> runningKeys() { 278 return new ArrayList<>(runningWorks); 279 } 280 281}