001/* 002 * (C) Copyright 2012-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedHashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.BlockingQueue; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.work.api.Work; 035import org.nuxeo.ecm.core.work.api.Work.State; 036import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 037 038/** 039 * Implementation of a {@link WorkQueuing} using in-memory queuing. 040 * 041 * @since 5.8 042 */ 043public class MemoryWorkQueuing implements WorkQueuing { 044 045 private static final Log log = LogFactory.getLog(MemoryWorkQueuing.class); 046 047 protected final WorkManagerImpl mgr; 048 049 // @GuardedBy("this") 050 protected final WorkQueueDescriptorRegistry workQueueDescriptors; 051 052 // @GuardedBy("this") 053 protected final Map<String, BlockingQueue<Runnable>> allQueued = new HashMap<>(); 054 055 // @GuardedBy("this") 056 // queueId -> workId -> work 057 protected final Map<String, Map<String, Work>> allScheduled = new HashMap<>(); 058 059 // @GuardedBy("this") 060 // queueId -> workId -> work 061 protected final Map<String, Map<String, Work>> allRunning = new HashMap<>(); 062 063 // @GuardedBy("this") 064 // queueId -> workId -> work 065 protected final Map<String, Map<String, Work>> allCompleted = new HashMap<>(); 066 067 public MemoryWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) { 068 this.mgr = mgr; 069 this.workQueueDescriptors = workQueueDescriptors; 070 } 071 072 @Override 073 public synchronized void init() { 074 allQueued.clear(); 075 allScheduled.clear(); 076 allRunning.clear(); 077 allCompleted.clear(); 078 } 079 080 // called synchronized 081 protected WorkQueueDescriptor getDescriptor(String queueId) { 082 WorkQueueDescriptor descriptor = workQueueDescriptors.get(queueId); 083 if (descriptor == null) { 084 throw new IllegalArgumentException("No such work queue: " + queueId); 085 } 086 return descriptor; 087 } 088 089 @Override 090 public BlockingQueue<Runnable> initWorkQueue(String queueId) { 091 if (allQueued.containsKey(queueId)) { 092 throw new IllegalStateException(queueId + " was already configured"); 093 } 094 final BlockingQueue<Runnable> queue = newBlockingQueue(getDescriptor(queueId)); 095 allQueued.put(queueId, queue); 096 return queue; 097 } 098 099 public BlockingQueue<Runnable> getWorkQueue(String queueId) { 100 if (!allQueued.containsKey(queueId)) { 101 throw new IllegalStateException(queueId + " was not configured yet"); 102 } 103 return allQueued.get(queueId); 104 } 105 106 @Override 107 public synchronized boolean workSchedule(String queueId, Work work) { 108 boolean ret = getWorkQueue(queueId).offer(new WorkHolder(work)); 109 if (ret) { 110 getScheduled(queueId).put(work.getId(), work); 111 } 112 return ret; 113 } 114 115 // called synchronized 116 protected Map<String, Work> getScheduled(String queueId) { 117 Map<String, Work> scheduled = allScheduled.get(queueId); 118 if (scheduled == null) { 119 allScheduled.put(queueId, scheduled = newScheduledMap()); 120 } 121 return scheduled; 122 } 123 124 // called synchronized 125 protected Map<String, Work> getRunning(String queueId) { 126 Map<String, Work> running = allRunning.get(queueId); 127 if (running == null) { 128 allRunning.put(queueId, running = newRunningMap()); 129 } 130 return running; 131 } 132 133 // called synchronized 134 protected Map<String, Work> getCompleted(String queueId) { 135 Map<String, Work> completed = allCompleted.get(queueId); 136 if (completed == null) { 137 allCompleted.put(queueId, completed = newCompletedMap()); 138 } 139 return completed; 140 } 141 142 protected BlockingQueue<Runnable> newBlockingQueue(WorkQueueDescriptor workQueueDescriptor) { 143 int capacity = workQueueDescriptor.getCapacity(); 144 if (capacity <= 0) { 145 capacity = -1; // unbounded 146 } 147 return new MemoryBlockingQueue(capacity); 148 } 149 150 protected Map<String, Work> newScheduledMap() { 151 return new HashMap<>(); 152 } 153 154 protected Map<String, Work> newRunningMap() { 155 return new HashMap<>(); 156 } 157 158 protected Map<String, Work> newCompletedMap() { 159 return new LinkedHashMap<>(); 160 } 161 162 @Override 163 public synchronized void workRunning(String queueId, Work work) { 164 // at this time the work is already taken from the queue by the thread pool 165 Work ret = getRunning(queueId).put(work.getId(), work); 166 if (ret != null) { 167 log.warn("Running a work with the same ID " + work.getId() + " multiple time, already running " + ret + 168 ", new work: " + work); 169 } 170 getScheduled(queueId).remove(work.getId()); 171 if (log.isTraceEnabled()) { 172 log.trace("Work running " + work.getId() + " on " + queueId + ", total running: " + 173 count(queueId, State.RUNNING)); 174 } 175 } 176 177 @Override 178 public synchronized void workCompleted(String queueId, Work work) { 179 if (log.isTraceEnabled()) { 180 log.trace("Work completed " + work.getId()); 181 } 182 getCompleted(queueId).put(work.getId(), work); 183 getRunning(queueId).remove(work.getId()); 184 } 185 186 @Override 187 public Work find(String workId, State state) { 188 if (state == null) { 189 Work w = findScheduled(workId); 190 if (w == null) { 191 w = findRunning(workId); 192 } 193 return w; 194 } 195 switch (state) { 196 case SCHEDULED: 197 return findScheduled(workId); 198 case RUNNING: 199 return findRunning(workId); 200 case COMPLETED: 201 return findCompleted(workId); 202 default: 203 return null; 204 } 205 } 206 207 @Override 208 public boolean isWorkInState(String workId, State state) { 209 if (state == null) { 210 return isScheduled(workId) || isRunning(workId); 211 } 212 switch (state) { 213 case SCHEDULED: 214 return isScheduled(workId); 215 case RUNNING: 216 return isRunning(workId); 217 case COMPLETED: 218 return isCompleted(workId); 219 default: 220 return false; 221 } 222 } 223 224 @Override 225 public State getWorkState(String workId) { 226 // TODO this is linear, but isScheduled is buggy 227 if (findScheduled(workId) != null) { 228 return State.SCHEDULED; 229 } 230 if (isRunning(workId)) { 231 return State.RUNNING; 232 } 233 if (isCompleted(workId)) { 234 return State.COMPLETED; 235 } 236 return null; 237 } 238 239 @Override 240 public synchronized List<Work> listWork(String queueId, State state) { 241 switch (state) { 242 case SCHEDULED: 243 return listScheduled(queueId); 244 case RUNNING: 245 return listRunning(queueId); 246 case COMPLETED: 247 return listCompleted(queueId); 248 default: 249 throw new IllegalArgumentException(String.valueOf(state)); 250 } 251 } 252 253 @Override 254 public synchronized List<String> listWorkIds(String queueId, State state) { 255 if (state == null) { 256 return listNonCompletedIds(queueId); 257 } 258 switch (state) { 259 case SCHEDULED: 260 return listScheduledIds(queueId); 261 case RUNNING: 262 return listRunningIds(queueId); 263 case COMPLETED: 264 return listCompletedIds(queueId); 265 default: 266 throw new IllegalArgumentException(String.valueOf(state)); 267 } 268 } 269 270 @Override 271 public int count(String queueId, State state) { 272 switch (state) { 273 case SCHEDULED: 274 return getScheduledSize(queueId); 275 case RUNNING: 276 return getRunningSize(queueId); 277 case COMPLETED: 278 return getCompletedSize(queueId); 279 default: 280 throw new IllegalArgumentException(String.valueOf(state)); 281 } 282 } 283 284 protected synchronized int getScheduledSize(String queueId) { 285 Map<String, Work> scheduled = allScheduled.get(queueId); 286 return scheduled == null ? 0 : scheduled.size(); 287 } 288 289 protected synchronized int getRunningSize(String queueId) { 290 Map<String, Work> running = allRunning.get(queueId); 291 return running == null ? 0 : running.size(); 292 } 293 294 protected synchronized int getCompletedSize(String queueId) { 295 Map<String, Work> completed = allCompleted.get(queueId); 296 return completed == null ? 0 : completed.size(); 297 } 298 299 protected synchronized boolean isScheduled(String workId) { 300 return find(workId, allScheduled) != null; 301 } 302 303 protected synchronized boolean isRunning(String workId) { 304 return find(workId, allRunning) != null; 305 } 306 307 protected synchronized boolean isCompleted(String workId) { 308 return find(workId, allCompleted) != null; 309 } 310 311 protected synchronized Work findScheduled(String workId) { 312 return find(workId, allScheduled); 313 } 314 315 protected synchronized Work findRunning(String workId) { 316 return find(workId, allRunning); 317 } 318 319 protected synchronized Work findCompleted(String workId) { 320 return find(workId, allCompleted); 321 } 322 323 private Work find(String workId, Map<String, Map<String, Work>> allWorks) { 324 for (Map<String, Work> works : allWorks.values()) { 325 Work ret = works.get(workId); 326 if (ret != null) { 327 return ret; 328 } 329 } 330 return null; 331 } 332 333 // called synchronized 334 protected List<Work> listScheduled(String queueId) { 335 return new ArrayList<>(getScheduled(queueId).values()); 336 } 337 338 // called synchronized 339 protected List<Work> listRunning(String queueId) { 340 return new ArrayList<>(getRunning(queueId).values()); 341 } 342 343 // called synchronized 344 protected List<Work> listCompleted(String queueId) { 345 return new ArrayList<>(getCompleted(queueId).values()); 346 } 347 348 // called synchronized 349 protected List<String> listScheduledIds(String queueId) { 350 return new ArrayList<>(getScheduled(queueId).keySet()); 351 } 352 353 // called synchronized 354 protected List<String> listRunningIds(String queueId) { 355 return new ArrayList<>(getRunning(queueId).keySet()); 356 } 357 358 // called synchronized 359 protected List<String> listNonCompletedIds(String queueId) { 360 List<String> list = listScheduledIds(queueId); 361 list.addAll(listRunningIds(queueId)); 362 return list; 363 } 364 365 // called synchronized 366 protected List<String> listCompletedIds(String queueId) { 367 return new ArrayList<>(getCompleted(queueId).keySet()); 368 } 369 370 @Override 371 public Work removeScheduled(String queueId, String workId) { 372 removeFromScheduledSet(queueId, workId); 373 for (Iterator<Runnable> it = getWorkQueue(queueId).iterator(); it.hasNext();) { 374 Runnable r = it.next(); 375 Work w = WorkHolder.getWork(r); 376 if (w.getId().equals(workId)) { 377 it.remove(); 378 return w; 379 } 380 } 381 return null; 382 } 383 384 protected Work removeFromScheduledSet(String queueId, String workId) { 385 for (Iterator<Work> it = getScheduled(queueId).values().iterator(); it.hasNext();) { 386 Work w = it.next(); 387 if (w.getId().equals(workId)) { 388 it.remove(); 389 return w; 390 } 391 } 392 return null; 393 } 394 395 @Override 396 public int setSuspending(String queueId) { 397 // for in-memory queuing, there's no suspend 398 // drain scheduled queue and mark work canceled 399 List<Runnable> scheduled = new ArrayList<>(); 400 getWorkQueue(queueId).drainTo(scheduled); 401 for (Runnable r : scheduled) { 402 Work work = WorkHolder.getWork(r); 403 work.setWorkInstanceState(State.CANCELED); 404 } 405 getScheduled(queueId).clear(); 406 return scheduled.size(); 407 } 408 409 @Override 410 public Set<String> getCompletedQueueIds() { 411 return new HashSet<>(allCompleted.keySet()); 412 } 413 414 @Override 415 public synchronized void clearCompletedWork(String queueId, long completionTime) { 416 Map<String, Work> completed = getCompleted(queueId); 417 if (completionTime <= 0) { 418 completed.clear(); 419 } else { 420 for (Iterator<Work> it = completed.values().iterator(); it.hasNext();) { 421 Work w = it.next(); 422 if (w.getCompletionTime() < completionTime) { 423 it.remove(); 424 } 425 } 426 } 427 } 428 429}