001/* 002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.BlockingQueue; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.locks.ReentrantLock; 033import java.util.stream.Collectors; 034 035import org.nuxeo.ecm.core.work.api.Work; 036import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 037 038/** 039 * Memory-based {@link BlockingQueue}. 040 * <p> 041 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are 042 * {@link WorkHolder}s. 043 */ 044public class MemoryBlockingQueue extends NuxeoBlockingQueue { 045 046 /** 047 * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls. 048 */ 049 private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> { 050 051 private static final long serialVersionUID = 1L; 052 053 private final ReentrantLock limitedPutLock = new ReentrantLock(); 054 055 private final int limitedCapacity; 056 057 /** 058 * Creates a {@link LinkedBlockingQueue} with a maximum capacity. 059 * <p> 060 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 061 * 062 * @param capacity the capacity, or -1 for unbounded 063 */ 064 public ReentrantLinkedBlockingQueue(int capacity) { 065 // Allocate more space to prevent starvation dead lock 066 // because a worker can add a new job to the queue. 067 super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity)); 068 limitedCapacity = capacity; 069 } 070 071 /** 072 * Block until there are enough remaining capacity to put the entry. 073 */ 074 public void limitedPut(T e) throws InterruptedException { 075 limitedPutLock.lockInterruptibly(); 076 try { 077 while (remainingCapacity() < limitedCapacity) { 078 // TODO replace by wakeup when an element is removed 079 Thread.sleep(100); 080 } 081 put(e); 082 } finally { 083 limitedPutLock.unlock(); 084 } 085 } 086 087 @Override 088 public boolean offer(T e) { 089 if (limitedCapacity < 0) { 090 return super.offer(e); 091 } 092 // turn non-blocking offer into a blocking put 093 try { 094 if (Thread.currentThread() 095 .getName() 096 .startsWith(WorkManagerImpl.THREAD_PREFIX)) { 097 // use the full queue capacity for reentrant call 098 put(e); 099 } else { 100 // put only if there are enough remaining capacity 101 limitedPut(e); 102 } 103 return true; 104 } catch (InterruptedException ie) { 105 Thread.currentThread() 106 .interrupt(); 107 throw new RuntimeException("interrupted", ie); 108 } 109 } 110 } 111 112 protected final BlockingQueue<Runnable> queue; 113 114 protected final Map<String, Work> works = new HashMap<>(); 115 116 protected final Set<String> scheduledWorks = new HashSet<>(); 117 118 protected final Set<String> runningWorks = new HashSet<>(); 119 120 long scheduledCount; 121 long runningCount; 122 long completedCount; 123 long cancelledCount; 124 125 /** 126 * Creates a {@link BlockingQueue} with a maximum capacity. 127 * <p> 128 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 129 * 130 * @param capacity the capacity, or -1 for unbounded 131 */ 132 public MemoryBlockingQueue(String id, MemoryWorkQueuing queuing, int capacity) { 133 super(id, queuing); 134 queue = new ReentrantLinkedBlockingQueue<>(capacity); 135 } 136 137 @Override 138 synchronized protected WorkQueueMetrics metrics() { 139 return new WorkQueueMetrics(queueId, scheduledCount, runningCount, completedCount, cancelledCount); 140 } 141 142 @Override 143 public int getQueueSize() { 144 return queue.size(); 145 } 146 147 @Override 148 public void putElement(Runnable r) throws InterruptedException { 149 queue.put(r); 150 } 151 152 @Override 153 public Runnable pollElement() { 154 Runnable r = queue.poll(); 155 return r; 156 } 157 158 @Override 159 public Runnable take() throws InterruptedException { 160 Runnable r = queue.take(); 161 return r; 162 } 163 164 @Override 165 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 166 long nanos = unit.toNanos(timeout); 167 nanos = awaitActivation(nanos); 168 if (nanos <= 0) { 169 return null; 170 } 171 return queue.poll(nanos, TimeUnit.NANOSECONDS); 172 } 173 174 synchronized WorkQueueMetrics workSchedule(Work work) { 175 String id = work.getId(); 176 if (scheduledWorks.contains(id)) { 177 return metrics(); 178 } 179 if (!offer(new WorkHolder(work))) { 180 return metrics(); 181 } 182 works.put(id, work); 183 scheduledWorks.add(id); 184 scheduledCount += 1; 185 return metrics(); 186 } 187 188 synchronized WorkQueueMetrics workRunning(Work work) { 189 String id = work.getId(); 190 scheduledWorks.remove(id); 191 works.put(id, work); // update state 192 runningWorks.add(id); 193 scheduledCount -= 1; 194 runningCount += 1; 195 return metrics(); 196 } 197 198 synchronized WorkQueueMetrics workCanceled(Work work) { 199 String id = work.getId(); 200 for (Iterator<Runnable> it = queue.iterator(); it.hasNext();) { 201 if (id.equals(WorkHolder.getWork(it.next()) 202 .getId())) { 203 it.remove(); 204 scheduledWorks.remove(id); 205 works.remove(id); 206 scheduledCount -= 1; 207 cancelledCount +=1 ; 208 break; 209 } 210 } 211 return metrics(); 212 } 213 214 synchronized WorkQueueMetrics workCompleted(Work work) { 215 String id = work.getId(); 216 if (runningWorks.remove(id) && !scheduledWorks.contains(id)) { 217 works.remove(id); 218 } 219 runningCount -= 1; 220 completedCount += 1; 221 return metrics(); 222 } 223 224 synchronized WorkQueueMetrics workRescheduleRunning(Work work) { 225 String id = work.getId(); 226 if (!runningWorks.remove(id)) { 227 return metrics(); 228 } 229 works.remove(id); 230 runningCount -= 1; 231 return workSchedule(work); 232 } 233 234 synchronized Work lookup(String workId) { 235 return works.get(workId); 236 } 237 238 synchronized List<Work> list() { 239 return new ArrayList<>(works.values()); 240 } 241 242 synchronized List<String> keys() { 243 return new ArrayList<>(works.keySet()); 244 } 245 246 synchronized List<Work> listScheduled() { 247 return scheduledWorks.stream() 248 .map(works::get) 249 .collect(Collectors.toList()); 250 } 251 252 synchronized List<String> scheduledKeys() { 253 return new ArrayList<>(scheduledWorks); 254 } 255 256 synchronized List<Work> listRunning() { 257 return runningWorks.stream() 258 .map(works::get) 259 .collect(Collectors.toList()); 260 } 261 262 synchronized List<String> runningKeys() { 263 return new ArrayList<>(runningWorks); 264 } 265 266}