001/* 002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import com.codahale.metrics.Counter; 023import com.codahale.metrics.MetricRegistry; 024import com.codahale.metrics.SharedMetricRegistries; 025import com.codahale.metrics.Timer; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.common.logging.SequenceTracer; 029import org.nuxeo.ecm.core.event.EventServiceComponent; 030import org.nuxeo.ecm.core.work.api.Work; 031import org.nuxeo.ecm.core.work.api.Work.State; 032import org.nuxeo.ecm.core.work.api.WorkManager; 033import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 034import org.nuxeo.ecm.core.work.api.WorkQueuingImplDescriptor; 035import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 036import org.nuxeo.runtime.RuntimeServiceEvent; 037import org.nuxeo.runtime.RuntimeServiceListener; 038import org.nuxeo.runtime.api.Framework; 039import org.nuxeo.runtime.metrics.MetricsService; 040import org.nuxeo.runtime.model.ComponentContext; 041import org.nuxeo.runtime.model.ComponentInstance; 042import org.nuxeo.runtime.model.DefaultComponent; 043import org.nuxeo.runtime.transaction.TransactionHelper; 044 045import java.lang.reflect.Constructor; 046import java.util.ArrayList; 047import java.util.Collection; 048import java.util.Collections; 049import java.util.HashMap; 050import java.util.LinkedList; 051import java.util.List; 052import java.util.Map; 053import java.util.concurrent.BlockingQueue; 054import java.util.concurrent.RejectedExecutionHandler; 055import java.util.concurrent.ThreadFactory; 056import java.util.concurrent.ThreadPoolExecutor; 057import java.util.concurrent.TimeUnit; 058import java.util.concurrent.atomic.AtomicInteger; 059import java.util.concurrent.locks.Condition; 060import java.util.concurrent.locks.ReentrantLock; 061 062import javax.naming.NamingException; 063import javax.transaction.RollbackException; 064import javax.transaction.Status; 065import javax.transaction.Synchronization; 066import javax.transaction.SystemException; 067import javax.transaction.Transaction; 068import javax.transaction.TransactionManager; 069 070/** 071 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 072 * implementation. 073 * 074 * @since 5.6 075 */ 076public class WorkManagerImpl extends DefaultComponent implements WorkManager { 077 078 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 079 080 private static final Log log = LogFactory.getLog(WorkManagerImpl.class); 081 082 protected static final String QUEUES_EP = "queues"; 083 084 protected static final String IMPL_EP = "implementation"; 085 086 public static final String DEFAULT_QUEUE_ID = "default"; 087 088 public static final String DEFAULT_CATEGORY = "default"; 089 090 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 091 092 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 093 094 // @GuardedBy("itself") 095 protected final WorkQueueDescriptorRegistry workQueueDescriptors = new WorkQueueDescriptorRegistry(this); 096 097 // used synchronized 098 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 099 100 protected WorkQueuing queuing = newWorkQueuing(MemoryWorkQueuing.class); 101 102 /** 103 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from 104 * another node in cluster mode. 105 */ 106 protected class WorkCompletionSynchronizer { 107 final protected ReentrantLock lock = new ReentrantLock(); 108 final protected Condition condition = lock.newCondition(); 109 110 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 111 lock.lock(); 112 try { 113 return condition.await(timeMs, TimeUnit.MILLISECONDS); 114 } finally { 115 lock.unlock(); 116 } 117 } 118 119 protected void signalCompletedWork() { 120 lock.lock(); 121 try { 122 condition.signalAll(); 123 } finally { 124 lock.unlock(); 125 } 126 } 127 128 } 129 130 protected WorkCompletionSynchronizer completionSynchronizer = new WorkCompletionSynchronizer(); 131 132 @Override 133 public void activate(ComponentContext context) { 134 Framework.addListener(new ShutdownListener()); 135 } 136 137 @Override 138 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 139 if (QUEUES_EP.equals(extensionPoint)) { 140 registerWorkQueueDescriptor((WorkQueueDescriptor) contribution); 141 } else if (IMPL_EP.equals(extensionPoint)) { 142 registerWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution); 143 } else { 144 throw new RuntimeException("Unknown extension point: " + extensionPoint); 145 } 146 } 147 148 @Override 149 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 150 if (QUEUES_EP.equals(extensionPoint)) { 151 unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution); 152 } else if (IMPL_EP.equals(extensionPoint)) { 153 unregisterWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution); 154 } else { 155 throw new RuntimeException("Unknown extension point: " + extensionPoint); 156 } 157 } 158 159 public void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 160 String queueId = workQueueDescriptor.id; 161 if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) { 162 Boolean processing = workQueueDescriptor.processing; 163 Boolean queuing = workQueueDescriptor.queuing; 164 if (processing == null && queuing == null) { 165 log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES 166 + " with no processing/queuing"); 167 return; 168 } 169 String what = processing == null ? "" : (" processing=" + processing); 170 what += queuing == null ? "" : (" queuing=" + queuing); 171 log.info("Setting on all work queues:" + what); 172 // activate/deactivate processing/queuing on all queues 173 List<String> queueIds = new ArrayList<>(workQueueDescriptors.getQueueIds()); // copy 174 for (String id : queueIds) { 175 // add an updated contribution redefining processing/queuing 176 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 177 wqd.id = id; 178 wqd.processing = processing; 179 wqd.queuing = queuing; 180 registerWorkQueueDescriptor(wqd); 181 } 182 return; 183 } 184 workQueueDescriptors.addContribution(workQueueDescriptor); 185 WorkQueueDescriptor wqd = workQueueDescriptors.get(queueId); 186 log.info("Registered work queue " + queueId + " " + wqd.toString()); 187 } 188 189 public void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 190 String id = workQueueDescriptor.id; 191 if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) { 192 return; 193 } 194 workQueueDescriptors.removeContribution(workQueueDescriptor); 195 log.info("Unregistered work queue " + id); 196 } 197 198 protected void activateQueue(WorkQueueDescriptor workQueueDescriptor) { 199 String id = workQueueDescriptor.id; 200 WorkThreadPoolExecutor executor = executors.get(id); 201 if (executor == null) { 202 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + id + "-"); 203 int maxPoolSize = workQueueDescriptor.getMaxThreads(); 204 executor = new WorkThreadPoolExecutor(id, maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, threadFactory); 205 // prestart all core threads so that direct additions to the queue 206 // (from another Nuxeo instance) can be seen 207 executor.prestartAllCoreThreads(); 208 executors.put(id, executor); 209 } 210 NuxeoBlockingQueue queue = (NuxeoBlockingQueue) executor.getQueue(); 211 // get merged contrib 212 // set active state 213 queue.setActive(workQueueDescriptor.isProcessingEnabled()); 214 log.info("Activated work queue " + id + " " + workQueueDescriptor.toEffectiveString()); 215 } 216 217 public void deactivateQueue(WorkQueueDescriptor workQueueDescriptor) { 218 if (WorkQueueDescriptor.ALL_QUEUES.equals(workQueueDescriptor.id)) { 219 return; 220 } 221 WorkThreadPoolExecutor executor = executors.get(workQueueDescriptor.id); 222 executor.shutdownAndSuspend(); 223 log.info("Deactivated work queue " + workQueueDescriptor.id); 224 } 225 226 public void registerWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) { 227 WorkQueuing q = newWorkQueuing(descr.getWorkQueuingClass()); 228 registerWorkQueuing(q); 229 } 230 231 public void registerWorkQueuing(WorkQueuing q) { 232 closeQueuing(); 233 queuing = q; 234 } 235 236 public void unregisterWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) { 237 unregisterWorkQueing(); 238 } 239 240 public void unregisterWorkQueing() { 241 closeQueuing(); 242 queuing = newWorkQueuing(MemoryWorkQueuing.class); 243 } 244 245 protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) { 246 WorkQueuing q; 247 try { 248 Constructor<? extends WorkQueuing> ctor = klass.getConstructor(WorkManagerImpl.class, 249 WorkQueueDescriptorRegistry.class); 250 q = ctor.newInstance(this, workQueueDescriptors); 251 } catch (ReflectiveOperationException | SecurityException e) { 252 throw new RuntimeException(e); 253 } 254 return q; 255 } 256 257 protected void closeQueuing() { 258 try { 259 shutdown(5, TimeUnit.SECONDS); 260 } catch (InterruptedException e) { 261 Thread.currentThread().interrupt(); // restore interrupted status 262 throw new RuntimeException(e); 263 } 264 } 265 266 protected boolean isQueuingEnabled(String queueId) { 267 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 268 return wqd == null ? false : wqd.isQueuingEnabled(); 269 } 270 271 protected boolean isProcessingEnabled(String queueId) { 272 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 273 return wqd == null ? false : wqd.isProcessingEnabled(); 274 } 275 276 // ----- WorkManager ----- 277 278 @Override 279 public List<String> getWorkQueueIds() { 280 synchronized (workQueueDescriptors) { 281 return workQueueDescriptors.getQueueIds(); 282 } 283 } 284 285 @Override 286 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 287 synchronized (workQueueDescriptors) { 288 return workQueueDescriptors.get(queueId); 289 } 290 } 291 292 @Override 293 public String getCategoryQueueId(String category) { 294 if (category == null) { 295 category = DEFAULT_CATEGORY; 296 } 297 String queueId = workQueueDescriptors.getQueueId(category); 298 if (queueId == null) { 299 queueId = DEFAULT_QUEUE_ID; 300 } 301 return queueId; 302 } 303 304 @Override 305 public int getApplicationStartedOrder() { 306 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 307 } 308 309 @Override 310 public void applicationStarted(ComponentContext context) { 311 init(); 312 } 313 314 protected volatile boolean started = false; 315 316 protected volatile boolean shutdownInProgress = false; 317 318 @Override 319 public void init() { 320 if (started) { 321 return; 322 } 323 synchronized (this) { 324 if (started) { 325 return; 326 } 327 started = true; 328 queuing.init(); 329 for (String id : workQueueDescriptors.getQueueIds()) { 330 activateQueue(workQueueDescriptors.get(id)); 331 } 332 } 333 } 334 335 protected WorkThreadPoolExecutor getExecutor(String queueId) { 336 if (!started) { 337 if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) { 338 LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode"); 339 init(); 340 } else { 341 throw new IllegalStateException("Work manager not started, could not access to executors"); 342 } 343 } 344 WorkQueueDescriptor workQueueDescriptor; 345 synchronized (workQueueDescriptors) { 346 workQueueDescriptor = workQueueDescriptors.get(queueId); 347 } 348 if (workQueueDescriptor == null) { 349 throw new IllegalArgumentException("No such work queue: " + queueId); 350 } 351 352 return executors.get(queueId); 353 } 354 355 @Override 356 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 357 WorkThreadPoolExecutor executor = getExecutor(queueId); 358 boolean terminated = shutdownExecutors(Collections.singleton(executor), timeout, unit); 359 removeExecutor(queueId); // start afresh 360 return terminated; 361 } 362 363 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 364 throws InterruptedException { 365 // mark executors as shutting down 366 for (WorkThreadPoolExecutor executor : list) { 367 executor.shutdownAndSuspend(); 368 } 369 370 long t0 = System.currentTimeMillis(); 371 long delay = unit.toMillis(timeout); 372 373 // wait for termination or suspension 374 boolean terminated = true; 375 for (WorkThreadPoolExecutor executor : list) { 376 long remaining = remainingMillis(t0, delay); 377 if (!executor.awaitTerminationOrSave(remaining, TimeUnit.MILLISECONDS)) { 378 terminated = false; 379 // hard shutdown for remaining tasks 380 executor.shutdownNow(); 381 } 382 } 383 384 return terminated; 385 } 386 387 protected long remainingMillis(long t0, long delay) { 388 long d = System.currentTimeMillis() - t0; 389 if (d > delay) { 390 return 0; 391 } 392 return delay - d; 393 } 394 395 protected synchronized void removeExecutor(String queueId) { 396 executors.remove(queueId); 397 } 398 399 @Override 400 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 401 shutdownInProgress = true; 402 try { 403 return shutdownExecutors(executors.values(), timeout, unit); 404 } finally { 405 shutdownInProgress = false; 406 started = false; 407 executors.clear(); 408 } 409 } 410 411 protected class ShutdownListener implements RuntimeServiceListener { 412 @Override 413 public void handleEvent(RuntimeServiceEvent event) { 414 if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) { 415 return; 416 } 417 Framework.removeListener(this); 418 closeQueuing(); 419 } 420 } 421 422 /** 423 * A work instance and how to schedule it, for schedule-after-commit. 424 * 425 * @since 5.8 426 */ 427 public class WorkScheduling implements Synchronization { 428 public final Work work; 429 430 public final Scheduling scheduling; 431 432 public WorkScheduling(Work work, Scheduling scheduling) { 433 this.work = work; 434 this.scheduling = scheduling; 435 } 436 437 @Override 438 public void beforeCompletion() { 439 } 440 441 @Override 442 public void afterCompletion(int status) { 443 if (status == Status.STATUS_COMMITTED) { 444 schedule(work, scheduling, false); 445 } else if (status == Status.STATUS_ROLLEDBACK) { 446 work.setWorkInstanceState(State.CANCELED); 447 } else { 448 throw new IllegalArgumentException("Unsupported transaction status " + status); 449 } 450 } 451 } 452 453 /** 454 * Creates non-daemon threads at normal priority. 455 */ 456 private static class NamedThreadFactory implements ThreadFactory { 457 458 private final AtomicInteger threadNumber = new AtomicInteger(); 459 460 private final ThreadGroup group; 461 462 private final String prefix; 463 464 public NamedThreadFactory(String prefix) { 465 SecurityManager sm = System.getSecurityManager(); 466 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 467 this.prefix = prefix; 468 } 469 470 @Override 471 public Thread newThread(Runnable r) { 472 String name = prefix + threadNumber.incrementAndGet(); 473 Thread thread = new Thread(group, r, name); 474 // do not set daemon 475 thread.setPriority(Thread.NORM_PRIORITY); 476 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 477 478 @Override 479 public void uncaughtException(Thread t, Throwable e) { 480 LogFactory.getLog(WorkManagerImpl.class).error("Uncaught error on thread " + t.getName(), e); 481 } 482 }); 483 return thread; 484 } 485 } 486 487 /** 488 * A handler for rejected tasks that discards them. 489 */ 490 public static class CancelingPolicy implements RejectedExecutionHandler { 491 492 public static final CancelingPolicy INSTANCE = new CancelingPolicy(); 493 494 @Override 495 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 496 ((WorkThreadPoolExecutor) executor).removedFromQueue(r); 497 } 498 } 499 500 501 /** 502 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 503 * <p> 504 * Completed tasks are passed to another queue. 505 * <p> 506 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 507 * persisted, etc). 508 * 509 * @since 5.6 510 */ 511 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 512 513 protected final String queueId; 514 515 /** 516 * List of running Work instances, in order to be able to interrupt them if requested. 517 */ 518 // @GuardedBy("itself") 519 protected final List<Work> running; 520 521 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 522 // Number of work scheduled by this instance 523 protected final Counter scheduledCount; 524 525 // Number of work currently running on this instance 526 protected final Counter runningCount; 527 528 // Number of work completed by this instance 529 protected final Counter completedCount; 530 531 protected final Timer workTimer; 532 533 protected WorkThreadPoolExecutor(String queueId, int corePoolSize, int maximumPoolSize, long keepAliveTime, 534 TimeUnit unit, ThreadFactory threadFactory) { 535 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queuing.initWorkQueue(queueId), threadFactory); 536 this.queueId = queueId; 537 running = new LinkedList<Work>(); 538 // init metrics 539 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 540 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 541 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 542 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 543 } 544 545 public int getScheduledOrRunningSize() { 546 int ret = 0; 547 for (String queueId : getWorkQueueIds()) { 548 ret += getQueueSize(queueId, null); 549 } 550 return ret; 551 } 552 553 @Override 554 public void execute(Runnable r) { 555 throw new UnsupportedOperationException("use other api"); 556 } 557 558 /** 559 * Executes the given task sometime in the future. 560 * 561 * @param work the work to execute 562 * @see #execute(Runnable) 563 */ 564 public void execute(Work work) { 565 scheduledCount.inc(); 566 submit(work); 567 } 568 569 /** 570 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 571 * directly 572 * 573 * @param work 574 * @throws RuntimeException 575 */ 576 protected void submit(Work work) throws RuntimeException { 577 boolean added = queuing.workSchedule(queueId, work); 578 if (!added) { 579 queuing.removeScheduled(queueId, work.getId()); 580 throw new RuntimeException("queue should have blocked"); 581 } 582 // DO NOT super.execute(new WorkHolder(work)); 583 } 584 585 @Override 586 protected void beforeExecute(Thread t, Runnable r) { 587 Work work = WorkHolder.getWork(r); 588 work.setWorkInstanceState(State.RUNNING); 589 queuing.workRunning(queueId, work); 590 synchronized (running) { 591 running.add(work); 592 } 593 // metrics 594 runningCount.inc(); 595 } 596 597 @Override 598 protected void afterExecute(Runnable r, Throwable t) { 599 Work work = WorkHolder.getWork(r); 600 synchronized (running) { 601 running.remove(work); 602 } 603 State state; 604 if (t == null) { 605 if (work.isWorkInstanceSuspended()) { 606 state = State.SCHEDULED; 607 } else { 608 state = State.COMPLETED; 609 } 610 } else { 611 state = State.FAILED; 612 } 613 work.setWorkInstanceState(state); 614 queuing.workCompleted(queueId, work); 615 // metrics 616 runningCount.dec(); 617 completedCount.inc(); 618 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 619 completionSynchronizer.signalCompletedWork(); 620 } 621 622 // called during shutdown 623 // with tasks from the queue if new tasks are submitted 624 // or with tasks drained from the queue 625 protected void removedFromQueue(Runnable r) { 626 Work work = WorkHolder.getWork(r); 627 work.setWorkInstanceState(State.CANCELED); 628 completionSynchronizer.signalCompletedWork(); 629 } 630 631 /** 632 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. The scheduled work 633 * instances are drained and suspended. 634 */ 635 public void shutdownAndSuspend() { 636 // rejected tasks will be discarded 637 setRejectedExecutionHandler(CancelingPolicy.INSTANCE); 638 // shutdown the executor 639 // if a new task is scheduled it will be rejected -> discarded 640 shutdown(); 641 // request all scheduled work instances to suspend (cancel) 642 int n = queuing.setSuspending(queueId); 643 // request all running work instances to suspend (stop) 644 synchronized (running) { 645 for (Work work : running) { 646 work.setWorkInstanceSuspending(); 647 } 648 } 649 } 650 651 /** 652 * Blocks until all work instances have completed after a shutdown and suspend request. 653 * 654 * @param timeout the time to wait 655 * @param unit the timeout unit 656 * @return true if all work stopped or was saved, false if some remaining after timeout 657 */ 658 public boolean awaitTerminationOrSave(long timeout, TimeUnit unit) throws InterruptedException { 659 boolean terminated = super.awaitTermination(timeout, unit); 660 if (!terminated) { 661 // drain queue from remaining scheduled work 662 List<Runnable> drained = new ArrayList<>(); 663 getQueue().drainTo(drained); 664 for (Runnable r : drained) { 665 removedFromQueue(r); 666 } 667 } 668 // some work still remaining after timeout 669 return terminated; 670 } 671 672 public Work removeScheduled(String workId) { 673 Work w = queuing.removeScheduled(queueId, workId); 674 return w; 675 } 676 677 } 678 679 @Override 680 public void schedule(Work work) { 681 schedule(work, Scheduling.ENQUEUE, false); 682 } 683 684 @Override 685 public void schedule(Work work, boolean afterCommit) { 686 schedule(work, Scheduling.ENQUEUE, afterCommit); 687 } 688 689 @Override 690 public void schedule(Work work, Scheduling scheduling) { 691 schedule(work, scheduling, false); 692 } 693 694 @Override 695 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 696 String workId = work.getId(); 697 String queueId = getCategoryQueueId(work.getCategory()); 698 if (!isQueuingEnabled(queueId)) { 699 return; 700 } 701 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 702 return; 703 } 704 work.setWorkInstanceState(State.SCHEDULED); 705 WorkSchedulePath.newInstance(work); 706 if (log.isTraceEnabled()) { 707 log.trace("Scheduling work: " + work + " using queue: " + queueId, work.getSchedulePath().getStack()); 708 } else if (log.isDebugEnabled()) { 709 log.debug("Scheduling work: " + work + " using queue: " + queueId); 710 } 711 switch (scheduling) { 712 case ENQUEUE: 713 break; 714 case CANCEL_SCHEDULED: 715 Work w = getExecutor(queueId).removeScheduled(workId); 716 if (w != null) { 717 w.setWorkInstanceState(State.CANCELED); 718 if (log.isDebugEnabled()) { 719 log.debug("Canceling existing scheduled work before scheduling"); 720 } 721 } 722 break; 723 case IF_NOT_SCHEDULED: 724 case IF_NOT_RUNNING: 725 case IF_NOT_RUNNING_OR_SCHEDULED: 726 // TODO disabled for now because hasWorkInState uses isScheduled 727 // which is buggy 728 boolean disabled = Boolean.TRUE.booleanValue(); 729 if (!disabled && hasWorkInState(workId, scheduling.state)) { 730 // mark passed work as canceled 731 work.setWorkInstanceState(State.CANCELED); 732 if (log.isDebugEnabled()) { 733 log.debug("Canceling schedule because found: " + scheduling); 734 } 735 return; 736 737 } 738 break; 739 740 } 741 getExecutor(queueId).execute(work); 742 } 743 744 /** 745 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 746 * 747 * @since 5.8 748 */ 749 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 750 TransactionManager transactionManager; 751 try { 752 transactionManager = TransactionHelper.lookupTransactionManager(); 753 } catch (NamingException e) { 754 transactionManager = null; 755 } 756 if (transactionManager == null) { 757 if (log.isDebugEnabled()) { 758 log.debug("Not scheduling work after commit because of missing transaction manager: " + work); 759 } 760 return false; 761 } 762 try { 763 Transaction transaction = transactionManager.getTransaction(); 764 if (transaction == null) { 765 if (log.isDebugEnabled()) { 766 log.debug("Not scheduling work after commit because of missing transaction: " + work); 767 } 768 return false; 769 } 770 int status = transaction.getStatus(); 771 if (status == Status.STATUS_ACTIVE) { 772 if (log.isDebugEnabled()) { 773 log.debug("Scheduling work after commit: " + work); 774 } 775 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 776 return true; 777 } else { 778 if (log.isDebugEnabled()) { 779 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 780 + work); 781 } 782 return false; 783 } 784 } catch (SystemException | RollbackException e) { 785 log.error("Cannot schedule after commit", e); 786 return false; 787 } 788 } 789 790 @Override 791 @Deprecated 792 public Work find(Work work, State state, boolean useEquals, int[] pos) { 793 if (pos != null) { 794 pos[0] = 0; // compat 795 } 796 String workId = work.getId(); 797 return queuing.find(workId, state); 798 } 799 800 @Override 801 public Work find(String workId, State state) { 802 return queuing.find(workId, state); 803 } 804 805 @Override 806 public String findResult(String workId) { 807 Work work = find(workId, State.COMPLETED); 808 return work != null ? work.getWorkInstanceResult() : null; 809 } 810 811 /** 812 * @param state SCHEDULED, RUNNING or null for both 813 */ 814 protected boolean hasWorkInState(String workId, State state) { 815 return queuing.isWorkInState(workId, state); 816 } 817 818 @Override 819 public State getWorkState(String workId) { 820 return queuing.getWorkState(workId); 821 } 822 823 @Override 824 public List<Work> listWork(String queueId, State state) { 825 // don't return scheduled after commit 826 return queuing.listWork(queueId, state); 827 } 828 829 @Override 830 public List<String> listWorkIds(String queueId, State state) { 831 return queuing.listWorkIds(queueId, state); 832 } 833 834 @Override 835 public int getQueueSize(String queueId, State state) { 836 if (state == null) { 837 return getScheduledOrRunningSize(queueId); 838 } 839 if (state == State.SCHEDULED) { 840 return getScheduledSize(queueId); 841 } else if (state == State.RUNNING) { 842 return getRunningSize(queueId); 843 } else if (state == State.COMPLETED) { 844 return getCompletedSize(queueId); 845 } else { 846 throw new IllegalArgumentException(String.valueOf(state)); 847 } 848 } 849 850 @Override 851 @Deprecated 852 public int getNonCompletedWorkSize(String queueId) { 853 return getScheduledOrRunningSize(queueId); 854 } 855 856 protected int getScheduledSize(String queueId) { 857 return queuing.count(queueId, State.SCHEDULED); 858 } 859 860 protected int getRunningSize(String queueId) { 861 return queuing.count(queueId, State.RUNNING); 862 } 863 864 protected int getScheduledOrRunningSize(String queueId) { 865 return getScheduledSize(queueId) + getRunningSize(queueId); 866 } 867 868 protected int getCompletedSize(String queueId) { 869 return queuing.count(queueId, State.COMPLETED); 870 } 871 872 @Override 873 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 874 return awaitCompletion(null, duration, unit); 875 } 876 877 @Override 878 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 879 SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId)); 880 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 881 long deadline = getTimestampAfter(durationInMs); 882 int pause = (int) Math.min(duration, 500L); 883 log.debug("awaitForCompletion " + durationInMs + " ms"); 884 do { 885 if (noScheduledOrRunningWork(queueId)) { 886 log.debug("Completed"); 887 completionSynchronizer.signalCompletedWork(); 888 SequenceTracer.stop("done"); 889 return true; 890 } 891 completionSynchronizer.waitForCompletedWork(pause); 892 } while (System.currentTimeMillis() < deadline); 893 log.info("awaitCompletion timeout after " + durationInMs + " ms"); 894 SequenceTracer.destroy("timeout after " + durationInMs + " ms"); 895 return false; 896 } 897 898 protected long getTimestampAfter(long durationInMs) { 899 long ret = System.currentTimeMillis() + durationInMs; 900 if (ret < 0) { 901 ret = Long.MAX_VALUE; 902 } 903 return ret; 904 } 905 906 protected boolean noScheduledOrRunningWork(String queueId) { 907 if (queueId != null) { 908 boolean ret = getQueueSize(queueId, null) == 0; 909 if (ret == false) { 910 if (log.isTraceEnabled()) { 911 log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + 912 ", running: " + getQueueSize(queueId, State.RUNNING)); 913 } 914 } 915 return ret; 916 } 917 for (String id : getWorkQueueIds()) { 918 if (getQueueSize(id, null) > 0) { 919 if (log.isTraceEnabled()) { 920 log.trace(id + " not empty, sched: " + getQueueSize(id, State.SCHEDULED) + 921 ", running: " + getQueueSize(id, State.RUNNING)); 922 } 923 return false; 924 } 925 } 926 return true; 927 } 928 929 @Override 930 public synchronized void clearCompletedWork(String queueId) { 931 queuing.clearCompletedWork(queueId, 0); 932 } 933 934 @Override 935 public synchronized void clearCompletedWork(long completionTime) { 936 for (String queueId : queuing.getCompletedQueueIds()) { 937 queuing.clearCompletedWork(queueId, completionTime); 938 } 939 } 940 941 @Override 942 public synchronized void cleanup() { 943 log.debug("Clearing old completed work"); 944 for (String queueId : queuing.getCompletedQueueIds()) { 945 WorkQueueDescriptor workQueueDescriptor = workQueueDescriptors.get(queueId); 946 if (workQueueDescriptor == null) { 947 // unknown queue 948 continue; 949 } 950 long delay = workQueueDescriptor.getClearCompletedAfterSeconds() * 1000L; 951 if (delay > 0) { 952 long completionTime = System.currentTimeMillis() - delay; 953 queuing.clearCompletedWork(queueId, completionTime); 954 } 955 } 956 } 957 958 @Override 959 public boolean isStarted() { 960 return started && !shutdownInProgress; 961 } 962 963}