001/* 002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.RejectedExecutionException; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036 037import javax.naming.NamingException; 038import javax.transaction.RollbackException; 039import javax.transaction.Status; 040import javax.transaction.Synchronization; 041import javax.transaction.SystemException; 042import javax.transaction.Transaction; 043import javax.transaction.TransactionManager; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.common.logging.SequenceTracer; 048import org.nuxeo.ecm.core.api.NuxeoException; 049import org.nuxeo.ecm.core.event.EventServiceComponent; 050import org.nuxeo.ecm.core.work.WorkQueuing.Listener; 051import org.nuxeo.ecm.core.work.api.Work; 052import org.nuxeo.ecm.core.work.api.Work.State; 053import org.nuxeo.ecm.core.work.api.WorkManager; 054import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 055import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 056import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor; 057import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 058import org.nuxeo.runtime.api.Framework; 059import org.nuxeo.runtime.metrics.MetricsService; 060import org.nuxeo.runtime.metrics.NuxeoMetricSet; 061import org.nuxeo.runtime.model.ComponentContext; 062import org.nuxeo.runtime.model.ComponentInstance; 063import org.nuxeo.runtime.model.ComponentManager; 064import org.nuxeo.runtime.model.DefaultComponent; 065import org.nuxeo.runtime.transaction.TransactionHelper; 066 067import com.codahale.metrics.Counter; 068import com.codahale.metrics.MetricRegistry; 069import com.codahale.metrics.SharedMetricRegistries; 070import com.codahale.metrics.Timer; 071 072/** 073 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 074 * implementation. 075 * 076 * @since 5.6 077 */ 078public class WorkManagerImpl extends DefaultComponent implements WorkManager { 079 080 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 081 082 private static final Log log = LogFactory.getLog(WorkManagerImpl.class); 083 084 protected static final String QUEUES_EP = "queues"; 085 086 protected static final String IMPL_EP = "implementation"; 087 088 public static final String DEFAULT_QUEUE_ID = "default"; 089 090 public static final String DEFAULT_CATEGORY = "default"; 091 092 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 093 094 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 095 096 // @GuardedBy("itself") 097 protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry(); 098 099 protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry(); 100 101 // used synchronized 102 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 103 104 protected WorkQueuing queuing; 105 106 /** 107 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another 108 * node in cluster mode. 109 */ 110 protected class WorkCompletionSynchronizer { 111 final protected ReentrantLock lock = new ReentrantLock(); 112 113 final protected Condition condition = lock.newCondition(); 114 115 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 116 lock.lock(); 117 try { 118 return condition.await(timeMs, TimeUnit.MILLISECONDS); 119 } finally { 120 lock.unlock(); 121 } 122 } 123 124 protected void signalCompletedWork() { 125 lock.lock(); 126 try { 127 condition.signalAll(); 128 } finally { 129 lock.unlock(); 130 } 131 } 132 133 } 134 135 protected WorkCompletionSynchronizer completionSynchronizer; 136 137 @Override 138 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 139 if (QUEUES_EP.equals(extensionPoint)) { 140 registerWorkQueueDescriptor((WorkQueueDescriptor) contribution); 141 } else if (IMPL_EP.equals(extensionPoint)) { 142 registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 143 } else { 144 throw new RuntimeException("Unknown extension point: " + extensionPoint); 145 } 146 } 147 148 @Override 149 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 150 if (QUEUES_EP.equals(extensionPoint)) { 151 unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution); 152 } else if (IMPL_EP.equals(extensionPoint)) { 153 unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 154 } else { 155 throw new RuntimeException("Unknown extension point: " + extensionPoint); 156 } 157 } 158 159 void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 160 String queueId = workQueueDescriptor.id; 161 if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) { 162 Boolean processing = workQueueDescriptor.processing; 163 if (processing == null) { 164 log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES 165 + " with no processing/queuing"); 166 return; 167 } 168 String what = " processing=" + processing; 169 what += queuing == null ? "" : (" queuing=" + queuing); 170 log.info("Setting on all work queues:" + what); 171 // activate/deactivate processing/queuing on all queues 172 List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy 173 for (String id : queueIds) { 174 // add an updated contribution redefining processing/queuing 175 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 176 wqd.id = id; 177 wqd.processing = processing; 178 registerWorkQueueDescriptor(wqd); 179 } 180 return; 181 } 182 workQueueConfig.addContribution(workQueueDescriptor); 183 WorkQueueDescriptor wqd = workQueueConfig.get(queueId); 184 log.info("Registered work queue " + queueId + " " + wqd.toString()); 185 } 186 187 void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 188 String id = workQueueDescriptor.id; 189 if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) { 190 return; 191 } 192 workQueueConfig.removeContribution(workQueueDescriptor); 193 log.info("Unregistered work queue " + id); 194 } 195 196 void initializeQueue(WorkQueueDescriptor config) { 197 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 198 throw new IllegalArgumentException("cannot initialize all queues"); 199 } 200 if (queuing.getQueue(config.id) != null) { 201 throw new IllegalStateException("work queue " + config.id + " is already initialized"); 202 } 203 if (executors.containsKey(config.id)) { 204 throw new IllegalStateException("work queue " + config.id + " already have an executor"); 205 } 206 NuxeoBlockingQueue queue = queuing.init(config); 207 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-"); 208 int maxPoolSize = config.getMaxThreads(); 209 WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, 210 queue, threadFactory); 211 // prestart all core threads so that direct additions to the queue 212 // (from another Nuxeo instance) can be seen 213 executor.prestartAllCoreThreads(); 214 executors.put(config.id, executor); 215 log.info("Initialized work queue " + config.id + " " + config.toEffectiveString()); 216 } 217 218 void activateQueue(WorkQueueDescriptor config) { 219 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 220 throw new IllegalArgumentException("cannot activate all queues"); 221 } 222 queuing.setActive(config.id, config.isProcessingEnabled()); 223 log.info("Activated work queue " + config.id + " " + config.toEffectiveString()); 224 // Enable metrics 225 if (config.isProcessingEnabled()) { 226 activateQueueMetrics(config.id); 227 } 228 } 229 230 void deactivateQueue(WorkQueueDescriptor config) { 231 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 232 throw new IllegalArgumentException("cannot deactivate all queues"); 233 } 234 // Disable metrics 235 if (config.isProcessingEnabled()) { 236 deactivateQueueMetrics(config.id); 237 } 238 queuing.setActive(config.id, false); 239 log.info("Deactivated work queue " + config.id); 240 } 241 242 void activateQueueMetrics(String queueId) { 243 NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId); 244 queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled", "count"); 245 queueMetrics.putGauge(() -> getMetrics(queueId).running, "running"); 246 queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed"); 247 queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled"); 248 registry.registerAll(queueMetrics); 249 } 250 251 void deactivateQueueMetrics(String queueId) { 252 String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId); 253 registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName)); 254 } 255 256 void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 257 workQueuingConfig.addContribution(descr); 258 } 259 260 void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 261 workQueuingConfig.removeContribution(descr); 262 } 263 264 protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) { 265 try { 266 return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener()); 267 } catch (ReflectiveOperationException | SecurityException e) { 268 throw new RuntimeException(e); 269 } 270 } 271 272 @Override 273 public boolean isQueuingEnabled(String queueId) { 274 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 275 return wqd != null && wqd.isQueuingEnabled(); 276 } 277 278 @Override 279 public void enableProcessing(boolean value) { 280 workQueueConfig.getQueueIds().forEach(id -> enableProcessing(id, value)); 281 } 282 283 @Override 284 public void enableProcessing(String queueId, boolean value) { 285 WorkQueueDescriptor config = workQueueConfig.get(queueId); 286 if (config == null) { 287 throw new IllegalArgumentException("no such queue " + queueId); 288 } 289 if (!value) { 290 deactivateQueue(config); 291 } else { 292 activateQueue(config); 293 } 294 } 295 296 @Override 297 public boolean isProcessingEnabled() { 298 for (String queueId : workQueueConfig.getQueueIds()) { 299 if (queuing.getQueue(queueId).active) { 300 return true; 301 } 302 } 303 return false; 304 } 305 306 @Override 307 public boolean isProcessingEnabled(String queueId) { 308 if (queueId == null) { 309 return isProcessingEnabled(); 310 } 311 return queuing.getQueue(queueId).active; 312 } 313 314 // ----- WorkManager ----- 315 316 @Override 317 public List<String> getWorkQueueIds() { 318 synchronized (workQueueConfig) { 319 return workQueueConfig.getQueueIds(); 320 } 321 } 322 323 @Override 324 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 325 synchronized (workQueueConfig) { 326 return workQueueConfig.get(queueId); 327 } 328 } 329 330 @Override 331 public String getCategoryQueueId(String category) { 332 if (category == null) { 333 category = DEFAULT_CATEGORY; 334 } 335 String queueId = workQueueConfig.getQueueId(category); 336 if (queueId == null) { 337 queueId = DEFAULT_QUEUE_ID; 338 } 339 return queueId; 340 } 341 342 @Override 343 public int getApplicationStartedOrder() { 344 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 345 } 346 347 @Override 348 public void start(ComponentContext context) { 349 init(); 350 } 351 352 @Override 353 public void stop(ComponentContext context) throws InterruptedException { 354 // do nothing: we shutdown the thread pool at first before stopping any component 355 // see LifeCycleHandler.beforeStop 356 } 357 358 protected volatile boolean started = false; 359 360 protected volatile boolean shutdownInProgress = false; 361 362 @Override 363 public void init() { 364 if (started) { 365 return; 366 } 367 synchronized (this) { 368 if (started) { 369 return; 370 } 371 queuing = newWorkQueuing(workQueuingConfig.klass); 372 completionSynchronizer = new WorkCompletionSynchronizer(); 373 started = true; 374 workQueueConfig.index(); 375 for (String id : workQueueConfig.getQueueIds()) { 376 initializeQueue(workQueueConfig.get(id)); 377 } 378 379 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.LifeCycleHandler() { 380 @Override 381 public void beforeStop(ComponentManager mgr, boolean isStandby) { 382 for (String id : workQueueConfig.getQueueIds()) { 383 deactivateQueue(workQueueConfig.get(id)); 384 } 385 try { 386 if (!shutdown(10, TimeUnit.SECONDS)) { 387 log.error("Some processors are still active"); 388 } 389 } catch (InterruptedException e) { 390 Thread.currentThread().interrupt(); 391 throw new NuxeoException("Interrupted while stopping work manager thread pools", e); 392 } 393 } 394 395 @Override 396 public void afterStart(ComponentManager mgr, boolean isResume) { 397 for (String id : workQueueConfig.getQueueIds()) { 398 activateQueue(workQueueConfig.get(id)); 399 } 400 } 401 402 @Override 403 public void afterStop(ComponentManager mgr, boolean isStandby) { 404 Framework.getRuntime().getComponentManager().removeListener(this); 405 } 406 }); 407 } 408 } 409 410 protected WorkThreadPoolExecutor getExecutor(String queueId) { 411 if (!started) { 412 if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) { 413 LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode"); 414 init(); 415 } else { 416 throw new IllegalStateException("Work manager not started, could not access to executors"); 417 } 418 } 419 WorkQueueDescriptor workQueueDescriptor; 420 synchronized (workQueueConfig) { 421 workQueueDescriptor = workQueueConfig.get(queueId); 422 } 423 if (workQueueDescriptor == null) { 424 throw new IllegalArgumentException("No such work queue: " + queueId); 425 } 426 427 return executors.get(queueId); 428 } 429 430 @Override 431 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 432 WorkThreadPoolExecutor executor = getExecutor(queueId); 433 return shutdownExecutors(Collections.singleton(executor), timeout, unit); 434 } 435 436 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 437 throws InterruptedException { 438 // mark executors as shutting down 439 for (WorkThreadPoolExecutor executor : list) { 440 executor.shutdownAndSuspend(); 441 } 442 timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); 443 // wait until threads termination 444 for (WorkThreadPoolExecutor executor : list) { 445 long t0 = System.currentTimeMillis(); 446 if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { 447 return false; 448 } 449 timeout -= unit.convert(System.currentTimeMillis() - t0, TimeUnit.MILLISECONDS); 450 } 451 return true; 452 } 453 454 protected long remainingMillis(long t0, long delay) { 455 long d = System.currentTimeMillis() - t0; 456 if (d > delay) { 457 return 0; 458 } 459 return delay - d; 460 } 461 462 protected synchronized void removeExecutor(String queueId) { 463 executors.remove(queueId); 464 } 465 466 @Override 467 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 468 shutdownInProgress = true; 469 try { 470 return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit); 471 } finally { 472 shutdownInProgress = false; 473 started = false; 474 } 475 } 476 477 /** 478 * A work instance and how to schedule it, for schedule-after-commit. 479 * 480 * @since 5.8 481 */ 482 public class WorkScheduling implements Synchronization { 483 public final Work work; 484 485 public final Scheduling scheduling; 486 487 public WorkScheduling(Work work, Scheduling scheduling) { 488 this.work = work; 489 this.scheduling = scheduling; 490 } 491 492 @Override 493 public void beforeCompletion() { 494 } 495 496 @Override 497 public void afterCompletion(int status) { 498 if (status == Status.STATUS_COMMITTED) { 499 schedule(work, scheduling, false); 500 } else if (status == Status.STATUS_ROLLEDBACK) { 501 work.setWorkInstanceState(State.UNKNOWN); 502 } else { 503 throw new IllegalArgumentException("Unsupported transaction status " + status); 504 } 505 } 506 } 507 508 /** 509 * Creates non-daemon threads at normal priority. 510 */ 511 private static class NamedThreadFactory implements ThreadFactory { 512 513 private final AtomicInteger threadNumber = new AtomicInteger(); 514 515 private final ThreadGroup group; 516 517 private final String prefix; 518 519 public NamedThreadFactory(String prefix) { 520 SecurityManager sm = System.getSecurityManager(); 521 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 522 this.prefix = prefix; 523 } 524 525 @Override 526 public Thread newThread(Runnable r) { 527 String name = prefix + threadNumber.incrementAndGet(); 528 Thread thread = new Thread(group, r, name); 529 // do not set daemon 530 thread.setPriority(Thread.NORM_PRIORITY); 531 thread.setUncaughtExceptionHandler((t, 532 e) -> LogFactory.getLog(WorkManagerImpl.class).error("Uncaught error on thread " + t.getName(), e)); 533 return thread; 534 } 535 } 536 537 /** 538 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 539 * <p> 540 * Completed tasks are passed to another queue. 541 * <p> 542 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 543 * persisted, etc). 544 * 545 * @since 5.6 546 */ 547 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 548 549 protected final String queueId; 550 551 /** 552 * List of running Work instances, in order to be able to interrupt them if requested. 553 */ 554 // @GuardedBy("itself") 555 protected final ConcurrentLinkedQueue<Work> running; 556 557 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 558 // Number of work scheduled by this instance 559 protected final Counter scheduledCount; 560 561 // Number of work currently running on this instance 562 protected final Counter runningCount; 563 564 // Number of work completed by this instance 565 protected final Counter completedCount; 566 567 protected final Timer workTimer; 568 569 protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 570 NuxeoBlockingQueue queue, ThreadFactory threadFactory) { 571 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory); 572 queueId = queue.queueId; 573 running = new ConcurrentLinkedQueue<>(); 574 // init metrics 575 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 576 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 577 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 578 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 579 } 580 581 public int getScheduledOrRunningSize() { 582 int ret = 0; 583 for (String queueId : getWorkQueueIds()) { 584 ret += getQueueSize(queueId, null); 585 } 586 return ret; 587 } 588 589 @Override 590 public void execute(Runnable r) { 591 throw new UnsupportedOperationException("use other api"); 592 } 593 594 /** 595 * Executes the given task sometime in the future. 596 * 597 * @param work the work to execute 598 * @see #execute(Runnable) 599 */ 600 public void execute(Work work) { 601 scheduledCount.inc(); 602 submit(work); 603 } 604 605 /** 606 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 607 * directly 608 */ 609 protected void submit(Work work) throws RuntimeException { 610 queuing.workSchedule(queueId, work); 611 } 612 613 @Override 614 protected void beforeExecute(Thread t, Runnable r) { 615 Work work = WorkHolder.getWork(r); 616 if (isShutdown()) { 617 work.setWorkInstanceState(State.SCHEDULED); 618 queuing.workReschedule(queueId, work); 619 throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work); 620 } 621 work.setWorkInstanceState(State.RUNNING); 622 queuing.workRunning(queueId, work); 623 running.add(work); 624 runningCount.inc(); 625 } 626 627 @Override 628 protected void afterExecute(Runnable r, Throwable t) { 629 Work work = WorkHolder.getWork(r); 630 try { 631 if (work.isSuspending()) { 632 log.trace(work + " is suspending, giving up"); 633 return; 634 } 635 if (isShutdown()) { 636 log.trace("rescheduling " + work.getId(), t); 637 work.setWorkInstanceState(State.SCHEDULED); 638 queuing.workReschedule(queueId, work); 639 return; 640 } 641 work.setWorkInstanceState(State.UNKNOWN); 642 queuing.workCompleted(queueId, work); 643 } finally { 644 running.remove(work); 645 runningCount.dec(); 646 completedCount.inc(); 647 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 648 completionSynchronizer.signalCompletedWork(); 649 } 650 } 651 652 /** 653 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. 654 * 655 * @throws InterruptedException 656 */ 657 public void shutdownAndSuspend() throws InterruptedException { 658 try { 659 // don't consume the queue anymore 660 deactivateQueueMetrics(queueId); 661 queuing.setActive(queueId, false); 662 // suspend all running work 663 for (Work work : running) { 664 work.setWorkInstanceSuspending(); 665 log.trace("suspending and rescheduling " + work.getId()); 666 work.setWorkInstanceState(State.SCHEDULED); 667 queuing.workReschedule(queueId, work); 668 } 669 shutdownNow(); 670 } finally { 671 executors.remove(queueId); 672 } 673 } 674 675 public void removeScheduled(String workId) { 676 queuing.removeScheduled(queueId, workId); 677 } 678 679 } 680 681 @Override 682 public void schedule(Work work) { 683 schedule(work, Scheduling.ENQUEUE, false); 684 } 685 686 @Override 687 public void schedule(Work work, boolean afterCommit) { 688 schedule(work, Scheduling.ENQUEUE, afterCommit); 689 } 690 691 @Override 692 public void schedule(Work work, Scheduling scheduling) { 693 schedule(work, scheduling, false); 694 } 695 696 @Override 697 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 698 String workId = work.getId(); 699 String queueId = getCategoryQueueId(work.getCategory()); 700 if (!isQueuingEnabled(queueId)) { 701 return; 702 } 703 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 704 return; 705 } 706 work.setWorkInstanceState(State.SCHEDULED); 707 WorkSchedulePath.newInstance(work); 708 switch (scheduling) { 709 case ENQUEUE: 710 break; 711 case CANCEL_SCHEDULED: 712 getExecutor(queueId).removeScheduled(workId); 713 break; 714 case IF_NOT_SCHEDULED: 715 case IF_NOT_RUNNING_OR_SCHEDULED: 716 // TODO disabled for now because hasWorkInState uses isScheduled 717 // which is buggy 718 boolean disabled = Boolean.TRUE.booleanValue(); 719 if (!disabled && hasWorkInState(workId, scheduling.state)) { 720 if (log.isDebugEnabled()) { 721 log.debug("Canceling schedule because found: " + scheduling); 722 } 723 return; 724 725 } 726 break; 727 728 } 729 queuing.workSchedule(queueId, work); 730 } 731 732 /** 733 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 734 * 735 * @since 5.8 736 */ 737 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 738 TransactionManager transactionManager; 739 try { 740 transactionManager = TransactionHelper.lookupTransactionManager(); 741 } catch (NamingException e) { 742 transactionManager = null; 743 } 744 if (transactionManager == null) { 745 if (log.isDebugEnabled()) { 746 log.debug("Not scheduling work after commit because of missing transaction manager: " + work); 747 } 748 return false; 749 } 750 try { 751 Transaction transaction = transactionManager.getTransaction(); 752 if (transaction == null) { 753 if (log.isDebugEnabled()) { 754 log.debug("Not scheduling work after commit because of missing transaction: " + work); 755 } 756 return false; 757 } 758 int status = transaction.getStatus(); 759 if (status == Status.STATUS_ACTIVE) { 760 if (log.isDebugEnabled()) { 761 log.debug("Scheduling work after commit: " + work); 762 } 763 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 764 return true; 765 } else if (status == Status.STATUS_COMMITTED) { 766 // called in afterCompletion, we can schedule immediately 767 if (log.isDebugEnabled()) { 768 log.debug("Scheduling work immediately: " + work); 769 } 770 return false; 771 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 772 if (log.isDebugEnabled()) { 773 log.debug("Cancelling schedule because transaction marked rollback-only: " + work); 774 } 775 return true; 776 } else { 777 if (log.isDebugEnabled()) { 778 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 779 + work); 780 } 781 return false; 782 } 783 } catch (SystemException | RollbackException e) { 784 log.error("Cannot schedule after commit", e); 785 return false; 786 } 787 } 788 789 @Override 790 public Work find(String workId, State state) { 791 return queuing.find(workId, state); 792 } 793 794 /** 795 * @param state SCHEDULED, RUNNING or null for both 796 */ 797 protected boolean hasWorkInState(String workId, State state) { 798 return queuing.isWorkInState(workId, state); 799 } 800 801 @Override 802 public State getWorkState(String workId) { 803 return queuing.getWorkState(workId); 804 } 805 806 @Override 807 public List<Work> listWork(String queueId, State state) { 808 // don't return scheduled after commit 809 return queuing.listWork(queueId, state); 810 } 811 812 @Override 813 public List<String> listWorkIds(String queueId, State state) { 814 return queuing.listWorkIds(queueId, state); 815 } 816 817 @Override 818 public WorkQueueMetrics getMetrics(String queueId) { 819 return queuing.metrics(queueId); 820 } 821 822 @Override 823 public int getQueueSize(String queueId, State state) { 824 WorkQueueMetrics metrics = getMetrics(queueId); 825 if (state == null) { 826 return metrics.scheduled.intValue() + metrics.running.intValue(); 827 } 828 if (state == State.SCHEDULED) { 829 return metrics.scheduled.intValue(); 830 } else if (state == State.RUNNING) { 831 return metrics.running.intValue(); 832 } else { 833 throw new IllegalArgumentException(String.valueOf(state)); 834 } 835 } 836 837 @Override 838 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 839 return awaitCompletion(null, duration, unit); 840 } 841 842 @Override 843 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 844 if (!isStarted()) { 845 return true; 846 } 847 SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId)); 848 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 849 long deadline = getTimestampAfter(durationInMs); 850 int pause = (int) Math.min(durationInMs, 500L); 851 log.debug("awaitForCompletion " + durationInMs + " ms"); 852 do { 853 if (noScheduledOrRunningWork(queueId)) { 854 completionSynchronizer.signalCompletedWork(); 855 SequenceTracer.stop("done"); 856 return true; 857 } 858 completionSynchronizer.waitForCompletedWork(pause); 859 } while (System.currentTimeMillis() < deadline); 860 log.info("awaitCompletion timeout after " + durationInMs + " ms"); 861 SequenceTracer.destroy("timeout after " + durationInMs + " ms"); 862 return false; 863 } 864 865 protected long getTimestampAfter(long durationInMs) { 866 long ret = System.currentTimeMillis() + durationInMs; 867 if (ret < 0) { 868 ret = Long.MAX_VALUE; 869 } 870 return ret; 871 } 872 873 protected boolean noScheduledOrRunningWork(String queueId) { 874 if (queueId == null) { 875 for (String id : getWorkQueueIds()) { 876 if (!noScheduledOrRunningWork(id)) { 877 return false; 878 } 879 } 880 return true; 881 } 882 if (!isProcessingEnabled(queueId)) { 883 return getExecutor(queueId).runningCount.getCount() == 0L; 884 } 885 if (getQueueSize(queueId, null) > 0) { 886 if (log.isTraceEnabled()) { 887 log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + ", running: " 888 + getQueueSize(queueId, State.RUNNING)); 889 } 890 return false; 891 } 892 if (log.isTraceEnabled()) { 893 log.trace(queueId + " is completed"); 894 } 895 return true; 896 } 897 898 @Override 899 public boolean isStarted() { 900 return started && !shutdownInProgress; 901 } 902 903}