001/* 002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.core.work; 018 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.LinkedHashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.BlockingQueue; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.ecm.core.work.api.Work; 032import org.nuxeo.ecm.core.work.api.Work.State; 033import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 034 035/** 036 * Implementation of a {@link WorkQueuing} using in-memory queuing. 037 * 038 * @since 5.8 039 */ 040public class MemoryWorkQueuing implements WorkQueuing { 041 042 private static final Log log = LogFactory.getLog(MemoryWorkQueuing.class); 043 044 protected final WorkManagerImpl mgr; 045 046 // @GuardedBy("this") 047 protected final WorkQueueDescriptorRegistry workQueueDescriptors; 048 049 // @GuardedBy("this") 050 protected final Map<String, BlockingQueue<Runnable>> allScheduled = new HashMap<String, BlockingQueue<Runnable>>(); 051 052 // @GuardedBy("this") 053 // queueId -> workId -> work 054 protected final Map<String, Map<String, Work>> allRunning = new HashMap<String, Map<String, Work>>(); 055 056 // @GuardedBy("this") 057 // queueId -> workId -> work 058 protected final Map<String, Map<String, Work>> allCompleted = new HashMap<String, Map<String, Work>>(); 059 060 public MemoryWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) { 061 this.mgr = mgr; 062 this.workQueueDescriptors = workQueueDescriptors; 063 } 064 065 @Override 066 public synchronized void init() { 067 allScheduled.clear(); 068 allRunning.clear(); 069 allCompleted.clear(); 070 } 071 072 // called synchronized 073 protected WorkQueueDescriptor getDescriptor(String queueId) { 074 WorkQueueDescriptor descriptor = workQueueDescriptors.get(queueId); 075 if (descriptor == null) { 076 throw new IllegalArgumentException("No such work queue: " + queueId); 077 } 078 return descriptor; 079 } 080 081 @Override 082 public BlockingQueue<Runnable> initScheduleQueue(String queueId) { 083 if (allScheduled.containsKey(queueId)) { 084 throw new IllegalStateException(queueId + " was already configured"); 085 } 086 final BlockingQueue<Runnable> queue = newBlockingQueue(getDescriptor(queueId)); 087 allScheduled.put(queueId, queue); 088 return queue; 089 } 090 091 @Override 092 public BlockingQueue<Runnable> getScheduledQueue(String queueId) { 093 if (!allScheduled.containsKey(queueId)) { 094 throw new IllegalStateException(queueId + " was not configured yet"); 095 } 096 return allScheduled.get(queueId); 097 } 098 099 // called synchronized 100 protected Map<String, Work> getRunning(String queueId) { 101 Map<String, Work> running = allRunning.get(queueId); 102 if (running == null) { 103 allRunning.put(queueId, running = newRunningMap()); 104 } 105 return running; 106 } 107 108 // called synchronized 109 protected Map<String, Work> getCompleted(String queueId) { 110 Map<String, Work> completed = allCompleted.get(queueId); 111 if (completed == null) { 112 allCompleted.put(queueId, completed = newCompletedMap()); 113 } 114 return completed; 115 } 116 117 protected BlockingQueue<Runnable> newBlockingQueue(WorkQueueDescriptor workQueueDescriptor) { 118 int capacity = workQueueDescriptor.getCapacity(); 119 if (capacity <= 0) { 120 capacity = -1; // unbounded 121 } 122 return new MemoryBlockingQueue(this, capacity); 123 } 124 125 protected Map<String, Work> newRunningMap() { 126 return new HashMap<String, Work>(); 127 } 128 129 protected Map<String, Work> newCompletedMap() { 130 return new LinkedHashMap<String, Work>(); 131 } 132 133 @Override 134 public synchronized void workRunning(String queueId, Work work) { 135 // work is already taken from the scheduled queue 136 // by the thread pool executor 137 getRunning(queueId).put(work.getId(), work); 138 } 139 140 @Override 141 public synchronized void workCompleted(String queueId, Work work) { 142 getRunning(queueId).remove(work.getId()); 143 getCompleted(queueId).put(work.getId(), work); 144 } 145 146 @Override 147 public Work find(String workId, State state) { 148 if (state == null) { 149 Work w = findScheduled(workId); 150 if (w == null) { 151 w = findRunning(workId); 152 } 153 return w; 154 } 155 switch (state) { 156 case SCHEDULED: 157 return findScheduled(workId); 158 case RUNNING: 159 return findRunning(workId); 160 case COMPLETED: 161 return findCompleted(workId); 162 default: 163 return null; 164 } 165 } 166 167 @Override 168 public boolean isWorkInState(String workId, State state) { 169 if (state == null) { 170 return isScheduled(workId) || isRunning(workId); 171 } 172 switch (state) { 173 case SCHEDULED: 174 return isScheduled(workId); 175 case RUNNING: 176 return isRunning(workId); 177 case COMPLETED: 178 return isCompleted(workId); 179 default: 180 return false; 181 } 182 } 183 184 @Override 185 public State getWorkState(String workId) { 186 // TODO this is linear, but isScheduled is buggy 187 if (findScheduled(workId) != null) { 188 return State.SCHEDULED; 189 } 190 if (isRunning(workId)) { 191 return State.RUNNING; 192 } 193 if (isCompleted(workId)) { 194 return State.COMPLETED; 195 } 196 return null; 197 } 198 199 @Override 200 public synchronized List<Work> listWork(String queueId, State state) { 201 switch (state) { 202 case SCHEDULED: 203 return listScheduled(queueId); 204 case RUNNING: 205 return listRunning(queueId); 206 case COMPLETED: 207 return listCompleted(queueId); 208 default: 209 throw new IllegalArgumentException(String.valueOf(state)); 210 } 211 } 212 213 @Override 214 public synchronized List<String> listWorkIds(String queueId, State state) { 215 if (state == null) { 216 return listNonCompletedIds(queueId); 217 } 218 switch (state) { 219 case SCHEDULED: 220 return listScheduledIds(queueId); 221 case RUNNING: 222 return listRunningIds(queueId); 223 case COMPLETED: 224 return listCompletedIds(queueId); 225 default: 226 throw new IllegalArgumentException(String.valueOf(state)); 227 } 228 } 229 230 @Override 231 public int getQueueSize(String queueId, State state) { 232 switch (state) { 233 case SCHEDULED: 234 return getScheduledSize(queueId); 235 case RUNNING: 236 return getRunningSize(queueId); 237 case COMPLETED: 238 return getCompletedSize(queueId); 239 default: 240 throw new IllegalArgumentException(String.valueOf(state)); 241 } 242 } 243 244 protected synchronized int getScheduledSize(String queueId) { 245 BlockingQueue<Runnable> scheduled = allScheduled.get(queueId); 246 return scheduled == null ? 0 : scheduled.size(); 247 } 248 249 protected synchronized int getRunningSize(String queueId) { 250 Map<String, Work> running = allRunning.get(queueId); 251 return running == null ? 0 : running.size(); 252 } 253 254 protected synchronized int getCompletedSize(String queueId) { 255 Map<String, Work> completed = allCompleted.get(queueId); 256 return completed == null ? 0 : completed.size(); 257 } 258 259 protected synchronized boolean isScheduled(String workId) { 260 for (BlockingQueue<Runnable> scheduled : allScheduled.values()) { 261 MemoryBlockingQueue q = (MemoryBlockingQueue) scheduled; 262 if (q.containsWorkId(workId)) { 263 return true; 264 } 265 } 266 return false; 267 } 268 269 protected synchronized boolean isRunning(String workId) { 270 for (Map<String, Work> running : allRunning.values()) { 271 if (running.containsKey(workId)) { 272 return true; 273 } 274 } 275 return false; 276 } 277 278 protected synchronized boolean isCompleted(String workId) { 279 for (Map<String, Work> completed : allCompleted.values()) { 280 if (completed.containsKey(workId)) { 281 return true; 282 } 283 } 284 return false; 285 } 286 287 protected synchronized Work findScheduled(String workId) { 288 for (BlockingQueue<Runnable> scheduled : allScheduled.values()) { 289 for (Runnable r : scheduled) { 290 Work w = WorkHolder.getWork(r); 291 if (w.getId().equals(workId)) { 292 return w; 293 } 294 } 295 } 296 return null; 297 } 298 299 protected synchronized Work findRunning(String workId) { 300 for (Map<String, Work> running : allRunning.values()) { 301 Work w = running.get(workId); 302 if (w != null) { 303 return w; 304 } 305 } 306 return null; 307 } 308 309 protected synchronized Work findCompleted(String workId) { 310 for (Map<String, Work> completed : allCompleted.values()) { 311 Work w = completed.get(workId); 312 if (w != null) { 313 return w; 314 } 315 } 316 return null; 317 } 318 319 // no synchronized as scheduled queue is thread-safe 320 protected List<Work> listScheduled(String queueId) { 321 BlockingQueue<Runnable> scheduled = getScheduledQueue(queueId); 322 List<Work> list = new ArrayList<Work>(scheduled.size()); 323 for (Runnable r : scheduled) { 324 Work w = WorkHolder.getWork(r); 325 list.add(w); 326 } 327 return list; 328 } 329 330 // called synchronized 331 protected List<Work> listRunning(String queueId) { 332 return new ArrayList<Work>(getRunning(queueId).values()); 333 } 334 335 // called synchronized 336 protected List<Work> listCompleted(String queueId) { 337 return new ArrayList<Work>(getCompleted(queueId).values()); 338 } 339 340 // no synchronized as scheduled queue is thread-safe 341 protected List<String> listScheduledIds(String queueId) { 342 BlockingQueue<Runnable> scheduled = getScheduledQueue(queueId); 343 List<String> list = new ArrayList<String>(scheduled.size()); 344 for (Runnable r : scheduled) { 345 Work w = WorkHolder.getWork(r); 346 list.add(w.getId()); 347 } 348 return list; 349 } 350 351 // called synchronized 352 protected List<String> listRunningIds(String queueId) { 353 return new ArrayList<String>(getRunning(queueId).keySet()); 354 } 355 356 // called synchronized 357 protected List<String> listNonCompletedIds(String queueId) { 358 List<String> list = listScheduledIds(queueId); 359 list.addAll(listRunningIds(queueId)); 360 return list; 361 } 362 363 // called synchronized 364 protected List<String> listCompletedIds(String queueId) { 365 return new ArrayList<String>(getCompleted(queueId).keySet()); 366 } 367 368 @Override 369 public Work removeScheduled(String queueId, String workId) { 370 for (Iterator<Runnable> it = getScheduledQueue(queueId).iterator(); it.hasNext();) { 371 Runnable r = it.next(); 372 Work w = WorkHolder.getWork(r); 373 if (w.getId().equals(workId)) { 374 it.remove(); 375 return w; 376 } 377 } 378 return null; 379 } 380 381 @Override 382 public int setSuspending(String queueId) { 383 // for in-memory queuing, there's no suspend 384 // drain scheduled queue and mark work canceled 385 List<Runnable> scheduled = new ArrayList<Runnable>(); 386 getScheduledQueue(queueId).drainTo(scheduled); 387 for (Runnable r : scheduled) { 388 Work work = WorkHolder.getWork(r); 389 work.setWorkInstanceState(State.CANCELED); 390 } 391 return scheduled.size(); 392 } 393 394 @Override 395 public Set<String> getCompletedQueueIds() { 396 return new HashSet<String>(allCompleted.keySet()); 397 } 398 399 @Override 400 public synchronized void clearCompletedWork(String queueId, long completionTime) { 401 Map<String, Work> completed = getCompleted(queueId); 402 if (completionTime <= 0) { 403 completed.clear(); 404 } else { 405 for (Iterator<Work> it = completed.values().iterator(); it.hasNext();) { 406 Work w = it.next(); 407 if (w.getCompletionTime() < completionTime) { 408 it.remove(); 409 } 410 } 411 } 412 } 413 414}