001/* 002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 * Benoit Delbosc 017 */ 018package org.nuxeo.ecm.core.work; 019 020import java.lang.reflect.Constructor; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.BlockingQueue; 029import java.util.concurrent.RejectedExecutionHandler; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036 037import javax.naming.NamingException; 038import javax.transaction.RollbackException; 039import javax.transaction.Status; 040import javax.transaction.Synchronization; 041import javax.transaction.SystemException; 042import javax.transaction.Transaction; 043import javax.transaction.TransactionManager; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.ecm.core.event.EventServiceComponent; 048import org.nuxeo.ecm.core.work.api.Work; 049import org.nuxeo.ecm.core.work.api.Work.State; 050import org.nuxeo.ecm.core.work.api.WorkManager; 051import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 052import org.nuxeo.ecm.core.work.api.WorkQueuingImplDescriptor; 053import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 054import org.nuxeo.runtime.RuntimeServiceEvent; 055import org.nuxeo.runtime.RuntimeServiceListener; 056import org.nuxeo.runtime.api.Framework; 057import org.nuxeo.runtime.metrics.MetricsService; 058import org.nuxeo.runtime.model.ComponentContext; 059import org.nuxeo.runtime.model.ComponentInstance; 060import org.nuxeo.runtime.model.DefaultComponent; 061import org.nuxeo.runtime.transaction.TransactionHelper; 062 063import com.codahale.metrics.Counter; 064import com.codahale.metrics.MetricRegistry; 065import com.codahale.metrics.SharedMetricRegistries; 066import com.codahale.metrics.Timer; 067 068/** 069 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 070 * implementation. 071 * 072 * @since 5.6 073 */ 074public class WorkManagerImpl extends DefaultComponent implements WorkManager { 075 076 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 077 078 private static final Log log = LogFactory.getLog(WorkManagerImpl.class); 079 080 protected static final String QUEUES_EP = "queues"; 081 082 protected static final String IMPL_EP = "implementation"; 083 084 public static final String DEFAULT_QUEUE_ID = "default"; 085 086 public static final String DEFAULT_CATEGORY = "default"; 087 088 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 089 090 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 091 092 // @GuardedBy("itself") 093 protected final WorkQueueDescriptorRegistry workQueueDescriptors = new WorkQueueDescriptorRegistry(this); 094 095 // used synchronized 096 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 097 098 protected final WorkCompletionSynchronizer completionSynchronizer = new WorkCompletionSynchronizer("all"); 099 100 protected WorkQueuing queuing = newWorkQueuing(MemoryWorkQueuing.class); 101 102 @Override 103 public void activate(ComponentContext context) { 104 Framework.addListener(new ShutdownListener()); 105 } 106 107 @Override 108 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 109 if (QUEUES_EP.equals(extensionPoint)) { 110 registerWorkQueueDescriptor((WorkQueueDescriptor) contribution); 111 } else if (IMPL_EP.equals(extensionPoint)) { 112 registerWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution); 113 } else { 114 throw new RuntimeException("Unknown extension point: " + extensionPoint); 115 } 116 } 117 118 @Override 119 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 120 if (QUEUES_EP.equals(extensionPoint)) { 121 unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution); 122 } else if (IMPL_EP.equals(extensionPoint)) { 123 unregisterWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution); 124 } else { 125 throw new RuntimeException("Unknown extension point: " + extensionPoint); 126 } 127 } 128 129 public void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 130 String queueId = workQueueDescriptor.id; 131 if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) { 132 Boolean processing = workQueueDescriptor.processing; 133 Boolean queuing = workQueueDescriptor.queuing; 134 if (processing == null && queuing == null) { 135 log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES 136 + " with no processing/queuing"); 137 return; 138 } 139 String what = processing == null ? "" : (" processing=" + processing); 140 what += queuing == null ? "" : (" queuing=" + queuing); 141 log.info("Setting on all work queues:" + what); 142 // activate/deactivate processing/queuing on all queues 143 List<String> queueIds = new ArrayList<>(workQueueDescriptors.getQueueIds()); // copy 144 for (String id : queueIds) { 145 // add an updated contribution redefining processing/queuing 146 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 147 wqd.id = id; 148 wqd.processing = processing; 149 wqd.queuing = queuing; 150 registerWorkQueueDescriptor(wqd); 151 } 152 return; 153 } 154 workQueueDescriptors.addContribution(workQueueDescriptor); 155 WorkQueueDescriptor wqd = workQueueDescriptors.get(queueId); 156 log.info("Registered work queue " + queueId + " " + wqd.toString()); 157 } 158 159 public void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 160 String id = workQueueDescriptor.id; 161 if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) { 162 return; 163 } 164 workQueueDescriptors.removeContribution(workQueueDescriptor); 165 log.info("Unregistered work queue " + id); 166 } 167 168 protected void activateQueue(WorkQueueDescriptor workQueueDescriptor) { 169 String id = workQueueDescriptor.id; 170 WorkThreadPoolExecutor executor = executors.get(id); 171 if (executor == null) { 172 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + id + "-"); 173 int maxPoolSize = workQueueDescriptor.getMaxThreads(); 174 executor = new WorkThreadPoolExecutor(id, maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, threadFactory); 175 // prestart all core threads so that direct additions to the queue 176 // (from another Nuxeo instance) can be seen 177 executor.prestartAllCoreThreads(); 178 executors.put(id, executor); 179 } 180 NuxeoBlockingQueue queue = (NuxeoBlockingQueue) executor.getQueue(); 181 // get merged contrib 182 // set active state 183 queue.setActive(workQueueDescriptor.isProcessingEnabled()); 184 log.info("Activated work queue " + id + " " + workQueueDescriptor.toEffectiveString()); 185 } 186 187 public void deactivateQueue(WorkQueueDescriptor workQueueDescriptor) { 188 if (WorkQueueDescriptor.ALL_QUEUES.equals(workQueueDescriptor.id)) { 189 return; 190 } 191 WorkThreadPoolExecutor executor = executors.get(workQueueDescriptor.id); 192 executor.shutdownAndSuspend(); 193 log.info("Deactivated work queue " + workQueueDescriptor.id); 194 } 195 196 public void registerWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) { 197 WorkQueuing q = newWorkQueuing(descr.getWorkQueuingClass()); 198 registerWorkQueuing(q); 199 } 200 201 public void registerWorkQueuing(WorkQueuing q) { 202 closeQueuing(); 203 queuing = q; 204 } 205 206 public void unregisterWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) { 207 unregisterWorkQueing(); 208 } 209 210 public void unregisterWorkQueing() { 211 closeQueuing(); 212 queuing = newWorkQueuing(MemoryWorkQueuing.class); 213 } 214 215 protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) { 216 WorkQueuing q; 217 try { 218 Constructor<? extends WorkQueuing> ctor = klass.getConstructor(WorkManagerImpl.class, 219 WorkQueueDescriptorRegistry.class); 220 q = ctor.newInstance(this, workQueueDescriptors); 221 } catch (ReflectiveOperationException | SecurityException e) { 222 throw new RuntimeException(e); 223 } 224 return q; 225 } 226 227 protected void closeQueuing() { 228 try { 229 shutdown(5, TimeUnit.SECONDS); 230 } catch (InterruptedException e) { 231 Thread.currentThread().interrupt(); // restore interrupted status 232 throw new RuntimeException(e); 233 } 234 } 235 236 protected boolean isQueuingEnabled(String queueId) { 237 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 238 return wqd == null ? false : wqd.isQueuingEnabled(); 239 } 240 241 protected boolean isProcessingEnabled(String queueId) { 242 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 243 return wqd == null ? false : wqd.isProcessingEnabled(); 244 } 245 246 // ----- WorkManager ----- 247 248 @Override 249 public List<String> getWorkQueueIds() { 250 synchronized (workQueueDescriptors) { 251 return workQueueDescriptors.getQueueIds(); 252 } 253 } 254 255 @Override 256 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 257 synchronized (workQueueDescriptors) { 258 return workQueueDescriptors.get(queueId); 259 } 260 } 261 262 @Override 263 public String getCategoryQueueId(String category) { 264 if (category == null) { 265 category = DEFAULT_CATEGORY; 266 } 267 String queueId = workQueueDescriptors.getQueueId(category); 268 if (queueId == null) { 269 queueId = DEFAULT_QUEUE_ID; 270 } 271 return queueId; 272 } 273 274 @Override 275 public int getApplicationStartedOrder() { 276 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 277 } 278 279 @Override 280 public void applicationStarted(ComponentContext context) { 281 init(); 282 } 283 284 protected volatile boolean started = false; 285 286 @Override 287 public void init() { 288 if (started) { 289 return; 290 } 291 synchronized (this) { 292 if (started) { 293 return; 294 } 295 started = true; 296 queuing.init(); 297 for (String id : workQueueDescriptors.getQueueIds()) { 298 activateQueue(workQueueDescriptors.get(id)); 299 } 300 } 301 } 302 303 protected WorkThreadPoolExecutor getExecutor(String queueId) { 304 if (!started) { 305 if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) { 306 LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode"); 307 init(); 308 } else { 309 throw new IllegalStateException("Work manager not started, could not access to executors"); 310 } 311 } 312 WorkQueueDescriptor workQueueDescriptor; 313 synchronized (workQueueDescriptors) { 314 workQueueDescriptor = workQueueDescriptors.get(queueId); 315 } 316 if (workQueueDescriptor == null) { 317 throw new IllegalArgumentException("No such work queue: " + queueId); 318 } 319 320 return executors.get(queueId); 321 } 322 323 @Override 324 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 325 WorkThreadPoolExecutor executor = getExecutor(queueId); 326 boolean terminated = shutdownExecutors(Collections.singleton(executor), timeout, unit); 327 removeExecutor(queueId); // start afresh 328 return terminated; 329 } 330 331 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 332 throws InterruptedException { 333 // mark executors as shutting down 334 for (WorkThreadPoolExecutor executor : list) { 335 executor.shutdownAndSuspend(); 336 } 337 338 long t0 = System.currentTimeMillis(); 339 long delay = unit.toMillis(timeout); 340 341 // wait for termination or suspension 342 boolean terminated = true; 343 for (WorkThreadPoolExecutor executor : list) { 344 long remaining = remainingMillis(t0, delay); 345 if (!executor.awaitTerminationOrSave(remaining, TimeUnit.MILLISECONDS)) { 346 terminated = false; 347 // hard shutdown for remaining tasks 348 executor.shutdownNow(); 349 } 350 } 351 352 return terminated; 353 } 354 355 protected long remainingMillis(long t0, long delay) { 356 long d = System.currentTimeMillis() - t0; 357 if (d > delay) { 358 return 0; 359 } 360 return delay - d; 361 } 362 363 protected synchronized void removeExecutor(String queueId) { 364 executors.remove(queueId); 365 } 366 367 @Override 368 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 369 List<WorkThreadPoolExecutor> executorList = new ArrayList<>(executors.values()); 370 executors.clear(); 371 started = false; 372 return shutdownExecutors(executorList, timeout, unit); 373 } 374 375 protected class ShutdownListener implements RuntimeServiceListener { 376 @Override 377 public void handleEvent(RuntimeServiceEvent event) { 378 if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) { 379 return; 380 } 381 Framework.removeListener(this); 382 closeQueuing(); 383 } 384 } 385 386 /** 387 * A work instance and how to schedule it, for schedule-after-commit. 388 * 389 * @since 5.8 390 */ 391 public class WorkScheduling implements Synchronization { 392 public final Work work; 393 394 public final Scheduling scheduling; 395 396 public WorkScheduling(Work work, Scheduling scheduling) { 397 this.work = work; 398 this.scheduling = scheduling; 399 } 400 401 @Override 402 public void beforeCompletion() { 403 } 404 405 @Override 406 public void afterCompletion(int status) { 407 if (status == Status.STATUS_COMMITTED) { 408 schedule(work, scheduling, false); 409 } else if (status == Status.STATUS_ROLLEDBACK) { 410 work.setWorkInstanceState(State.CANCELED); 411 } else { 412 throw new IllegalArgumentException("Unsupported transaction status " + status); 413 } 414 } 415 } 416 417 /** 418 * Creates non-daemon threads at normal priority. 419 */ 420 private static class NamedThreadFactory implements ThreadFactory { 421 422 private final AtomicInteger threadNumber = new AtomicInteger(); 423 424 private final ThreadGroup group; 425 426 private final String prefix; 427 428 public NamedThreadFactory(String prefix) { 429 SecurityManager sm = System.getSecurityManager(); 430 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 431 this.prefix = prefix; 432 } 433 434 @Override 435 public Thread newThread(Runnable r) { 436 String name = prefix + threadNumber.incrementAndGet(); 437 Thread thread = new Thread(group, r, name); 438 // do not set daemon 439 thread.setPriority(Thread.NORM_PRIORITY); 440 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 441 442 @Override 443 public void uncaughtException(Thread t, Throwable e) { 444 LogFactory.getLog(WorkManagerImpl.class).error("Uncaught error on thread " + t.getName(), e); 445 } 446 }); 447 return thread; 448 } 449 } 450 451 /** 452 * A handler for rejected tasks that discards them. 453 */ 454 public static class CancelingPolicy implements RejectedExecutionHandler { 455 456 public static final CancelingPolicy INSTANCE = new CancelingPolicy(); 457 458 @Override 459 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 460 ((WorkThreadPoolExecutor) executor).removedFromQueue(r); 461 } 462 } 463 464 public class WorkCompletionSynchronizer { 465 466 protected final AtomicInteger scheduledOrRunning = new AtomicInteger(0); 467 468 protected final ReentrantLock completionLock = new ReentrantLock(); 469 470 protected final Condition completion = completionLock.newCondition(); 471 472 @SuppressWarnings("hiding") 473 protected final Log log = LogFactory.getLog(WorkCompletionSynchronizer.class); 474 475 protected final String queueid; 476 477 protected WorkCompletionSynchronizer(String id) { 478 queueid = id; 479 } 480 481 protected long await(long timeout) throws InterruptedException { 482 completionLock.lock(); 483 try { 484 while (timeout > 0 && scheduledOrRunning.get() > 0) { 485 timeout = completion.awaitNanos(timeout); 486 } 487 } finally { 488 if (log.isTraceEnabled()) { 489 log.trace("returning from await"); 490 } 491 completionLock.unlock(); 492 } 493 return timeout; 494 } 495 496 protected void signalSchedule() { 497 int value = scheduledOrRunning.incrementAndGet(); 498 if (log.isTraceEnabled()) { 499 logScheduleAndRunning("scheduled", value); 500 } 501 if (completionSynchronizer != this) { 502 completionSynchronizer.signalSchedule(); 503 } 504 } 505 506 protected void signalCompletion() { 507 final int value = scheduledOrRunning.decrementAndGet(); 508 if (value == 0) { 509 completionLock.lock(); 510 try { 511 completion.signalAll(); 512 } finally { 513 completionLock.unlock(); 514 } 515 } 516 if (log.isTraceEnabled()) { 517 logScheduleAndRunning("completed", value); 518 } 519 if (completionSynchronizer != this) { 520 completionSynchronizer.signalCompletion(); 521 } 522 } 523 524 protected void logScheduleAndRunning(String event, int value) { 525 log.trace(event + " [" + queueid + "," + value + "]", new Throwable("stack trace")); 526 } 527 528 } 529 530 /** 531 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 532 * <p> 533 * Completed tasks are passed to another queue. 534 * <p> 535 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 536 * persisted, etc). 537 * 538 * @since 5.6 539 */ 540 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 541 542 protected final String queueId; 543 544 protected final WorkCompletionSynchronizer completionSynchronizer; 545 546 /** 547 * List of running Work instances, in order to be able to interrupt them if requested. 548 */ 549 // @GuardedBy("itself") 550 protected final List<Work> running; 551 552 // metrics 553 554 protected final Counter scheduledCount; 555 556 protected final Counter scheduledMax; 557 558 protected final Counter runningCount; 559 560 protected final Counter completedCount; 561 562 protected final Timer workTimer; 563 564 protected WorkThreadPoolExecutor(String queueId, int corePoolSize, int maximumPoolSize, long keepAliveTime, 565 TimeUnit unit, ThreadFactory threadFactory) { 566 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queuing.initScheduleQueue(queueId), threadFactory); 567 this.queueId = queueId; 568 completionSynchronizer = new WorkCompletionSynchronizer(queueId); 569 running = new LinkedList<Work>(); 570 // init metrics 571 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 572 scheduledMax = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "max")); 573 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 574 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 575 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 576 } 577 578 public int getScheduledOrRunningSize() { 579 return completionSynchronizer.scheduledOrRunning.get(); 580 } 581 582 @Override 583 public void execute(Runnable r) { 584 throw new UnsupportedOperationException("use other api"); 585 } 586 587 /** 588 * Executes the given task sometime in the future. 589 * 590 * @param work the work to execute 591 * @see #execute(Runnable) 592 */ 593 public void execute(Work work) { 594 scheduledCount.inc(); 595 if (scheduledCount.getCount() > scheduledMax.getCount()) { 596 scheduledMax.inc(); 597 } 598 completionSynchronizer.signalSchedule(); 599 boolean ok = false; 600 try { 601 submit(work); 602 ok = true; 603 } finally { 604 if (!ok) { 605 completionSynchronizer.signalCompletion(); 606 } 607 } 608 } 609 610 /** 611 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 612 * directly 613 * 614 * @param work 615 * @throws RuntimeException 616 */ 617 protected void submit(Work work) throws RuntimeException { 618 BlockingQueue<Runnable> queue = queuing.getScheduledQueue(queueId); 619 boolean added = queue.offer(new WorkHolder(work)); 620 if (!added) { 621 throw new RuntimeException("queue should have blocked"); 622 } 623 // DO NOT super.execute(new WorkHolder(work)); 624 } 625 626 @Override 627 protected void beforeExecute(Thread t, Runnable r) { 628 Work work = WorkHolder.getWork(r); 629 work.setWorkInstanceState(State.RUNNING); 630 queuing.workRunning(queueId, work); 631 synchronized (running) { 632 running.add(work); 633 } 634 // metrics 635 scheduledCount.dec(); 636 runningCount.inc(); 637 } 638 639 @Override 640 protected void afterExecute(Runnable r, Throwable t) { 641 try { 642 Work work = WorkHolder.getWork(r); 643 synchronized (running) { 644 running.remove(work); 645 } 646 State state; 647 if (t == null) { 648 if (work.isWorkInstanceSuspended()) { 649 state = State.SCHEDULED; 650 } else { 651 state = State.COMPLETED; 652 } 653 } else { 654 state = State.FAILED; 655 } 656 work.setWorkInstanceState(state); 657 queuing.workCompleted(queueId, work); 658 // metrics 659 runningCount.dec(); 660 completedCount.inc(); 661 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 662 } finally { 663 completionSynchronizer.signalCompletion(); 664 } 665 } 666 667 // called during shutdown 668 // with tasks from the queue if new tasks are submitted 669 // or with tasks drained from the queue 670 protected void removedFromQueue(Runnable r) { 671 Work work = WorkHolder.getWork(r); 672 work.setWorkInstanceState(State.CANCELED); 673 completionSynchronizer.signalCompletion(); 674 } 675 676 /** 677 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. The scheduled work 678 * instances are drained and suspended. 679 */ 680 public void shutdownAndSuspend() { 681 // rejected tasks will be discarded 682 setRejectedExecutionHandler(CancelingPolicy.INSTANCE); 683 // shutdown the executor 684 // if a new task is scheduled it will be rejected -> discarded 685 shutdown(); 686 // request all scheduled work instances to suspend (cancel) 687 int n = queuing.setSuspending(queueId); 688 completionSynchronizer.scheduledOrRunning.addAndGet(-n); 689 // request all running work instances to suspend (stop) 690 synchronized (running) { 691 for (Work work : running) { 692 work.setWorkInstanceSuspending(); 693 } 694 } 695 } 696 697 /** 698 * Blocks until all work instances have completed after a shutdown and suspend request. 699 * 700 * @param timeout the time to wait 701 * @param unit the timeout unit 702 * @return true if all work stopped or was saved, false if some remaining after timeout 703 */ 704 public boolean awaitTerminationOrSave(long timeout, TimeUnit unit) throws InterruptedException { 705 boolean terminated = super.awaitTermination(timeout, unit); 706 if (!terminated) { 707 // drain queue from remaining scheduled work 708 List<Runnable> drained = new ArrayList<>(); 709 getQueue().drainTo(drained); 710 for (Runnable r : drained) { 711 removedFromQueue(r); 712 } 713 } 714 // some work still remaining after timeout 715 return terminated; 716 } 717 718 public Work removeScheduled(String workId) { 719 Work w = queuing.removeScheduled(queueId, workId); 720 if (w != null) { 721 completionSynchronizer.signalCompletion(); 722 } 723 return w; 724 } 725 726 } 727 728 @Override 729 public void schedule(Work work) { 730 schedule(work, Scheduling.ENQUEUE, false); 731 } 732 733 @Override 734 public void schedule(Work work, boolean afterCommit) { 735 schedule(work, Scheduling.ENQUEUE, afterCommit); 736 } 737 738 @Override 739 public void schedule(Work work, Scheduling scheduling) { 740 schedule(work, scheduling, false); 741 } 742 743 @Override 744 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 745 String workId = work.getId(); 746 String queueId = getCategoryQueueId(work.getCategory()); 747 if (!isQueuingEnabled(queueId)) { 748 work.setWorkInstanceState(State.CANCELED); 749 return; 750 } 751 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 752 return; 753 } 754 work.setWorkInstanceState(State.SCHEDULED); 755 WorkSchedulePath.newInstance(work); 756 if (log.isTraceEnabled()) { 757 log.trace("Scheduling work: " + work + " using queue: " + queueId, work.getSchedulePath().getStack()); 758 } else if (log.isDebugEnabled()) { 759 log.debug("Scheduling work: " + work + " using queue: " + queueId); 760 } 761 switch (scheduling) { 762 case ENQUEUE: 763 break; 764 case CANCEL_SCHEDULED: 765 Work w = getExecutor(queueId).removeScheduled(workId); 766 if (w != null) { 767 w.setWorkInstanceState(State.CANCELED); 768 if (log.isDebugEnabled()) { 769 log.debug("Canceling existing scheduled work before scheduling (" 770 + completionSynchronizer.scheduledOrRunning.get() + ")"); 771 } 772 } 773 break; 774 case IF_NOT_SCHEDULED: 775 case IF_NOT_RUNNING: 776 case IF_NOT_RUNNING_OR_SCHEDULED: 777 // TODO disabled for now because hasWorkInState uses isScheduled 778 // which is buggy 779 boolean disabled = Boolean.TRUE.booleanValue(); 780 if (!disabled && hasWorkInState(workId, scheduling.state)) { 781 // mark passed work as canceled 782 work.setWorkInstanceState(State.CANCELED); 783 if (log.isDebugEnabled()) { 784 log.debug("Canceling schedule because found: " + scheduling); 785 } 786 return; 787 788 } 789 break; 790 791 } 792 getExecutor(queueId).execute(work); 793 } 794 795 /** 796 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 797 * 798 * @since 5.8 799 */ 800 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 801 TransactionManager transactionManager; 802 try { 803 transactionManager = TransactionHelper.lookupTransactionManager(); 804 } catch (NamingException e) { 805 transactionManager = null; 806 } 807 if (transactionManager == null) { 808 if (log.isDebugEnabled()) { 809 log.debug("Not scheduling work after commit because of missing transaction manager: " + work); 810 } 811 return false; 812 } 813 try { 814 Transaction transaction = transactionManager.getTransaction(); 815 if (transaction == null) { 816 if (log.isDebugEnabled()) { 817 log.debug("Not scheduling work after commit because of missing transaction: " + work); 818 } 819 return false; 820 } 821 int status = transaction.getStatus(); 822 if (status == Status.STATUS_ACTIVE) { 823 if (log.isDebugEnabled()) { 824 log.debug("Scheduling work after commit: " + work); 825 } 826 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 827 return true; 828 } else { 829 if (log.isDebugEnabled()) { 830 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 831 + work); 832 } 833 return false; 834 } 835 } catch (SystemException | RollbackException e) { 836 log.error("Cannot schedule after commit", e); 837 return false; 838 } 839 } 840 841 @Override 842 @Deprecated 843 public Work find(Work work, State state, boolean useEquals, int[] pos) { 844 if (pos != null) { 845 pos[0] = 0; // compat 846 } 847 String workId = work.getId(); 848 return queuing.find(workId, state); 849 } 850 851 @Override 852 public Work find(String workId, State state) { 853 return queuing.find(workId, state); 854 } 855 856 @Override 857 public String findResult(String workId) { 858 Work work = find(workId, State.COMPLETED); 859 return work != null ? work.getWorkInstanceResult() : null; 860 } 861 862 /** 863 * @param state SCHEDULED, RUNNING or null for both 864 */ 865 protected boolean hasWorkInState(String workId, State state) { 866 return queuing.isWorkInState(workId, state); 867 } 868 869 @Override 870 public State getWorkState(String workId) { 871 return queuing.getWorkState(workId); 872 } 873 874 @Override 875 public List<Work> listWork(String queueId, State state) { 876 // don't return scheduled after commit 877 return queuing.listWork(queueId, state); 878 } 879 880 @Override 881 public List<String> listWorkIds(String queueId, State state) { 882 return queuing.listWorkIds(queueId, state); 883 } 884 885 @Override 886 public int getQueueSize(String queueId, State state) { 887 if (state == null) { 888 return getScheduledOrRunningSize(queueId); 889 } 890 if (state == State.SCHEDULED) { 891 return getScheduledSize(queueId); 892 } else if (state == State.RUNNING) { 893 return getRunningSize(queueId); 894 } else if (state == State.COMPLETED) { 895 return getCompletedSize(queueId); 896 } else { 897 throw new IllegalArgumentException(String.valueOf(state)); 898 } 899 } 900 901 @Override 902 @Deprecated 903 public int getNonCompletedWorkSize(String queueId) { 904 return getScheduledOrRunningSize(queueId); 905 } 906 907 protected int getScheduledSize(String queueId) { 908 return queuing.getQueueSize(queueId, State.SCHEDULED); 909 } 910 911 protected int getRunningSize(String queueId) { 912 return queuing.getQueueSize(queueId, State.RUNNING); 913 } 914 915 protected int getScheduledOrRunningSize(String queueId) { 916 // check the thread pool directly, this avoids race conditions 917 // because queuing.getRunningSize takes a bit of time to be 918 // accurate after a work is scheduled 919 return getExecutor(queueId).getScheduledOrRunningSize(); 920 } 921 922 protected int getCompletedSize(String queueId) { 923 return queuing.getQueueSize(queueId, State.COMPLETED); 924 } 925 926 @Override 927 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 928 return getExecutor(queueId).completionSynchronizer.await(unit.toNanos(duration)) > 0; 929 } 930 931 @Override 932 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 933 return completionSynchronizer.await(unit.toNanos(duration)) > 0; 934 } 935 936 @Override 937 public synchronized void clearCompletedWork(String queueId) { 938 queuing.clearCompletedWork(queueId, 0); 939 } 940 941 @Override 942 public synchronized void clearCompletedWork(long completionTime) { 943 for (String queueId : queuing.getCompletedQueueIds()) { 944 queuing.clearCompletedWork(queueId, completionTime); 945 } 946 } 947 948 @Override 949 public synchronized void cleanup() { 950 log.debug("Clearing old completed work"); 951 for (String queueId : queuing.getCompletedQueueIds()) { 952 WorkQueueDescriptor workQueueDescriptor = workQueueDescriptors.get(queueId); 953 if (workQueueDescriptor == null) { 954 // unknown queue 955 continue; 956 } 957 long delay = workQueueDescriptor.getClearCompletedAfterSeconds() * 1000L; 958 if (delay > 0) { 959 long completionTime = System.currentTimeMillis() - delay; 960 queuing.clearCompletedWork(queueId, completionTime); 961 } 962 } 963 } 964 965 @Override 966 public boolean isStarted() { 967 return started; 968 } 969 970}