001/* 002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.RejectedExecutionException; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036 037import javax.naming.NamingException; 038import javax.transaction.RollbackException; 039import javax.transaction.Status; 040import javax.transaction.Synchronization; 041import javax.transaction.SystemException; 042import javax.transaction.Transaction; 043import javax.transaction.TransactionManager; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.common.logging.SequenceTracer; 048import org.nuxeo.ecm.core.event.EventServiceComponent; 049import org.nuxeo.ecm.core.work.WorkQueuing.Listener; 050import org.nuxeo.ecm.core.work.api.Work; 051import org.nuxeo.ecm.core.work.api.Work.State; 052import org.nuxeo.ecm.core.work.api.WorkManager; 053import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 054import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 055import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor; 056import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 057import org.nuxeo.runtime.RuntimeServiceEvent; 058import org.nuxeo.runtime.RuntimeServiceListener; 059import org.nuxeo.runtime.api.Framework; 060import org.nuxeo.runtime.metrics.MetricsService; 061import org.nuxeo.runtime.model.ComponentContext; 062import org.nuxeo.runtime.model.ComponentInstance; 063import org.nuxeo.runtime.model.DefaultComponent; 064import org.nuxeo.runtime.transaction.TransactionHelper; 065 066import com.codahale.metrics.Counter; 067import com.codahale.metrics.MetricRegistry; 068import com.codahale.metrics.SharedMetricRegistries; 069import com.codahale.metrics.Timer; 070 071/** 072 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 073 * implementation. 074 * 075 * @since 5.6 076 */ 077public class WorkManagerImpl extends DefaultComponent implements WorkManager { 078 079 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 080 081 private static final Log log = LogFactory.getLog(WorkManagerImpl.class); 082 083 protected static final String QUEUES_EP = "queues"; 084 085 protected static final String IMPL_EP = "implementation"; 086 087 public static final String DEFAULT_QUEUE_ID = "default"; 088 089 public static final String DEFAULT_CATEGORY = "default"; 090 091 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 092 093 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 094 095 // @GuardedBy("itself") 096 protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry(); 097 098 protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry(); 099 100 // used synchronized 101 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 102 103 protected WorkQueuing queuing; 104 105 /** 106 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another 107 * node in cluster mode. 108 */ 109 protected class WorkCompletionSynchronizer { 110 final protected ReentrantLock lock = new ReentrantLock(); 111 112 final protected Condition condition = lock.newCondition(); 113 114 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 115 lock.lock(); 116 try { 117 return condition.await(timeMs, TimeUnit.MILLISECONDS); 118 } finally { 119 lock.unlock(); 120 } 121 } 122 123 protected void signalCompletedWork() { 124 lock.lock(); 125 try { 126 condition.signalAll(); 127 } finally { 128 lock.unlock(); 129 } 130 } 131 132 } 133 134 protected WorkCompletionSynchronizer completionSynchronizer; 135 136 @Override 137 public void activate(ComponentContext context) { 138 Framework.addListener(new ShutdownListener()); 139 } 140 141 @Override 142 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 143 if (QUEUES_EP.equals(extensionPoint)) { 144 registerWorkQueueDescriptor((WorkQueueDescriptor) contribution); 145 } else if (IMPL_EP.equals(extensionPoint)) { 146 registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 147 } else { 148 throw new RuntimeException("Unknown extension point: " + extensionPoint); 149 } 150 } 151 152 @Override 153 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 154 if (QUEUES_EP.equals(extensionPoint)) { 155 unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution); 156 } else if (IMPL_EP.equals(extensionPoint)) { 157 unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 158 } else { 159 throw new RuntimeException("Unknown extension point: " + extensionPoint); 160 } 161 } 162 163 void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 164 String queueId = workQueueDescriptor.id; 165 if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) { 166 Boolean processing = workQueueDescriptor.processing; 167 if (processing == null) { 168 log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES 169 + " with no processing/queuing"); 170 return; 171 } 172 String what = " processing=" + processing; 173 what += queuing == null ? "" : (" queuing=" + queuing); 174 log.info("Setting on all work queues:" + what); 175 // activate/deactivate processing/queuing on all queues 176 List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy 177 for (String id : queueIds) { 178 // add an updated contribution redefining processing/queuing 179 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 180 wqd.id = id; 181 wqd.processing = processing; 182 registerWorkQueueDescriptor(wqd); 183 } 184 return; 185 } 186 workQueueConfig.addContribution(workQueueDescriptor); 187 WorkQueueDescriptor wqd = workQueueConfig.get(queueId); 188 log.info("Registered work queue " + queueId + " " + wqd.toString()); 189 } 190 191 void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 192 String id = workQueueDescriptor.id; 193 if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) { 194 return; 195 } 196 workQueueConfig.removeContribution(workQueueDescriptor); 197 log.info("Unregistered work queue " + id); 198 } 199 200 void initializeQueue(WorkQueueDescriptor config) { 201 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 202 throw new IllegalArgumentException("cannot initialize all queues"); 203 } 204 if (queuing.getQueue(config.id) != null) { 205 throw new IllegalStateException("work queue " + config.id + " is already initialized"); 206 } 207 if (executors.containsKey(config.id)) { 208 throw new IllegalStateException("work queue " + config.id + " already have an executor"); 209 } 210 NuxeoBlockingQueue queue = queuing.init(config); 211 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-"); 212 int maxPoolSize = config.getMaxThreads(); 213 WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, 214 queue, threadFactory); 215 // prestart all core threads so that direct additions to the queue 216 // (from another Nuxeo instance) can be seen 217 executor.prestartAllCoreThreads(); 218 executors.put(config.id, executor); 219 log.info("Initialized work queue " + config.id + " " + config.toEffectiveString()); 220 } 221 222 void activateQueue(WorkQueueDescriptor config) { 223 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 224 throw new IllegalArgumentException("cannot activate all queues"); 225 } 226 queuing.setActive(config.id, config.isProcessingEnabled()); 227 log.info("Activated work queue " + config.id + " " + config.toEffectiveString()); 228 } 229 230 void deactivateQueue(WorkQueueDescriptor config) { 231 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 232 throw new IllegalArgumentException("cannot deactivate all queues"); 233 } 234 queuing.setActive(config.id, false); 235 log.info("Deactivated work queue " + config.id); 236 } 237 238 void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 239 workQueuingConfig.addContribution(descr); 240 } 241 242 void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 243 workQueuingConfig.removeContribution(descr); 244 } 245 246 protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) { 247 try { 248 return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener()); 249 } catch (ReflectiveOperationException | SecurityException e) { 250 throw new RuntimeException(e); 251 } 252 } 253 254 @Override 255 public boolean isQueuingEnabled(String queueId) { 256 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 257 return wqd != null && wqd.isQueuingEnabled(); 258 } 259 260 @Override 261 public void enableProcessing(boolean value) { 262 for (String queueId : workQueueConfig.getQueueIds()) { 263 queuing.getQueue(queueId).setActive(value); 264 } 265 } 266 267 @Override 268 public void enableProcessing(String queueId, boolean value) throws InterruptedException { 269 WorkQueueDescriptor config = workQueueConfig.get(queueId); 270 if (config == null) { 271 throw new IllegalArgumentException("no such queue " + queueId); 272 } 273 if (!value) { 274 deactivateQueue(config); 275 } else { 276 activateQueue(config); 277 } 278 } 279 280 @Override 281 public boolean isProcessingEnabled() { 282 for (String queueId : workQueueConfig.getQueueIds()) { 283 if (queuing.getQueue(queueId).active) { 284 return true; 285 } 286 } 287 return false; 288 } 289 290 @Override 291 public boolean isProcessingEnabled(String queueId) { 292 if (queueId == null) { 293 return isProcessingEnabled(); 294 } 295 return queuing.getQueue(queueId).active; 296 } 297 298 // ----- WorkManager ----- 299 300 @Override 301 public List<String> getWorkQueueIds() { 302 synchronized (workQueueConfig) { 303 return workQueueConfig.getQueueIds(); 304 } 305 } 306 307 @Override 308 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 309 synchronized (workQueueConfig) { 310 return workQueueConfig.get(queueId); 311 } 312 } 313 314 @Override 315 public String getCategoryQueueId(String category) { 316 if (category == null) { 317 category = DEFAULT_CATEGORY; 318 } 319 String queueId = workQueueConfig.getQueueId(category); 320 if (queueId == null) { 321 queueId = DEFAULT_QUEUE_ID; 322 } 323 return queueId; 324 } 325 326 @Override 327 public int getApplicationStartedOrder() { 328 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 329 } 330 331 @Override 332 public void applicationStarted(ComponentContext context) { 333 init(); 334 } 335 336 protected volatile boolean started = false; 337 338 protected volatile boolean shutdownInProgress = false; 339 340 @Override 341 public void init() { 342 if (started) { 343 return; 344 } 345 synchronized (this) { 346 if (started) { 347 return; 348 } 349 queuing = newWorkQueuing(workQueuingConfig.klass); 350 completionSynchronizer = new WorkCompletionSynchronizer(); 351 started = true; 352 workQueueConfig.index(); 353 for (String id : workQueueConfig.getQueueIds()) { 354 initializeQueue(workQueueConfig.get(id)); 355 } 356 for (String id : workQueueConfig.getQueueIds()) { 357 activateQueue(workQueueConfig.get(id)); 358 } 359 } 360 } 361 362 protected WorkThreadPoolExecutor getExecutor(String queueId) { 363 if (!started) { 364 if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) { 365 LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode"); 366 init(); 367 } else { 368 throw new IllegalStateException("Work manager not started, could not access to executors"); 369 } 370 } 371 WorkQueueDescriptor workQueueDescriptor; 372 synchronized (workQueueConfig) { 373 workQueueDescriptor = workQueueConfig.get(queueId); 374 } 375 if (workQueueDescriptor == null) { 376 throw new IllegalArgumentException("No such work queue: " + queueId); 377 } 378 379 return executors.get(queueId); 380 } 381 382 @Override 383 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 384 WorkThreadPoolExecutor executor = getExecutor(queueId); 385 return shutdownExecutors(Collections.singleton(executor), timeout, unit); 386 } 387 388 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 389 throws InterruptedException { 390 // mark executors as shutting down 391 for (WorkThreadPoolExecutor executor : list) { 392 executor.shutdownAndSuspend(); 393 } 394 timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); 395 // wait until threads termination 396 for (WorkThreadPoolExecutor executor : list) { 397 long t0 = System.currentTimeMillis(); 398 if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { 399 return false; 400 } 401 timeout -= unit.convert(System.currentTimeMillis() - t0, TimeUnit.MILLISECONDS); 402 } 403 return true; 404 } 405 406 protected long remainingMillis(long t0, long delay) { 407 long d = System.currentTimeMillis() - t0; 408 if (d > delay) { 409 return 0; 410 } 411 return delay - d; 412 } 413 414 protected synchronized void removeExecutor(String queueId) { 415 executors.remove(queueId); 416 } 417 418 @Override 419 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 420 shutdownInProgress = true; 421 try { 422 return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit); 423 } finally { 424 shutdownInProgress = false; 425 started = false; 426 } 427 } 428 429 protected class ShutdownListener implements RuntimeServiceListener { 430 @Override 431 public void handleEvent(RuntimeServiceEvent event) { 432 if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) { 433 return; 434 } 435 Framework.removeListener(this); 436 try { 437 if (!shutdown(10, TimeUnit.SECONDS)) { 438 log.error("Some processors are still active"); 439 } 440 } catch (InterruptedException cause) { 441 Thread.currentThread().interrupt(); 442 log.error("Interrupted during works manager shutdown, continuing runtime shutdown", cause); 443 } 444 } 445 } 446 447 /** 448 * A work instance and how to schedule it, for schedule-after-commit. 449 * 450 * @since 5.8 451 */ 452 public class WorkScheduling implements Synchronization { 453 public final Work work; 454 455 public final Scheduling scheduling; 456 457 public WorkScheduling(Work work, Scheduling scheduling) { 458 this.work = work; 459 this.scheduling = scheduling; 460 } 461 462 @Override 463 public void beforeCompletion() { 464 } 465 466 @Override 467 public void afterCompletion(int status) { 468 if (status == Status.STATUS_COMMITTED) { 469 schedule(work, scheduling, false); 470 } else if (status == Status.STATUS_ROLLEDBACK) { 471 work.setWorkInstanceState(State.UNKNOWN); 472 } else { 473 throw new IllegalArgumentException("Unsupported transaction status " + status); 474 } 475 } 476 } 477 478 /** 479 * Creates non-daemon threads at normal priority. 480 */ 481 private static class NamedThreadFactory implements ThreadFactory { 482 483 private final AtomicInteger threadNumber = new AtomicInteger(); 484 485 private final ThreadGroup group; 486 487 private final String prefix; 488 489 public NamedThreadFactory(String prefix) { 490 SecurityManager sm = System.getSecurityManager(); 491 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 492 this.prefix = prefix; 493 } 494 495 @Override 496 public Thread newThread(Runnable r) { 497 String name = prefix + threadNumber.incrementAndGet(); 498 Thread thread = new Thread(group, r, name); 499 // do not set daemon 500 thread.setPriority(Thread.NORM_PRIORITY); 501 thread.setUncaughtExceptionHandler((t, 502 e) -> LogFactory.getLog(WorkManagerImpl.class).error("Uncaught error on thread " + t.getName(), e)); 503 return thread; 504 } 505 } 506 507 /** 508 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 509 * <p> 510 * Completed tasks are passed to another queue. 511 * <p> 512 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 513 * persisted, etc). 514 * 515 * @since 5.6 516 */ 517 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 518 519 protected final String queueId; 520 521 /** 522 * List of running Work instances, in order to be able to interrupt them if requested. 523 */ 524 // @GuardedBy("itself") 525 protected final ConcurrentLinkedQueue<Work> running; 526 527 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 528 // Number of work scheduled by this instance 529 protected final Counter scheduledCount; 530 531 // Number of work currently running on this instance 532 protected final Counter runningCount; 533 534 // Number of work completed by this instance 535 protected final Counter completedCount; 536 537 protected final Timer workTimer; 538 539 protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 540 NuxeoBlockingQueue queue, ThreadFactory threadFactory) { 541 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory); 542 queueId = queue.queueId; 543 running = new ConcurrentLinkedQueue<>(); 544 // init metrics 545 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 546 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 547 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 548 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 549 } 550 551 public int getScheduledOrRunningSize() { 552 int ret = 0; 553 for (String queueId : getWorkQueueIds()) { 554 ret += getQueueSize(queueId, null); 555 } 556 return ret; 557 } 558 559 @Override 560 public void execute(Runnable r) { 561 throw new UnsupportedOperationException("use other api"); 562 } 563 564 /** 565 * Executes the given task sometime in the future. 566 * 567 * @param work the work to execute 568 * @see #execute(Runnable) 569 */ 570 public void execute(Work work) { 571 scheduledCount.inc(); 572 submit(work); 573 } 574 575 /** 576 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 577 * directly 578 */ 579 protected void submit(Work work) throws RuntimeException { 580 queuing.workSchedule(queueId, work); 581 } 582 583 @Override 584 protected void beforeExecute(Thread t, Runnable r) { 585 Work work = WorkHolder.getWork(r); 586 if (isShutdown()) { 587 work.setWorkInstanceState(State.SCHEDULED); 588 queuing.workReschedule(queueId, work); 589 throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work); 590 } 591 work.setWorkInstanceState(State.RUNNING); 592 queuing.workRunning(queueId, work); 593 running.add(work); 594 runningCount.inc(); 595 } 596 597 @Override 598 protected void afterExecute(Runnable r, Throwable t) { 599 Work work = WorkHolder.getWork(r); 600 try { 601 if (work.isSuspending()) { 602 log.trace(work + " is suspending, giving up"); 603 return; 604 } 605 if (isShutdown()) { 606 log.trace("rescheduling " + work.getId(), t); 607 work.setWorkInstanceState(State.SCHEDULED); 608 queuing.workReschedule(queueId, work); 609 return; 610 } 611 work.setWorkInstanceState(State.UNKNOWN); 612 queuing.workCompleted(queueId, work); 613 } finally { 614 running.remove(work); 615 runningCount.dec(); 616 completedCount.inc(); 617 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 618 completionSynchronizer.signalCompletedWork(); 619 } 620 } 621 622 /** 623 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. 624 * 625 * @throws InterruptedException 626 */ 627 public void shutdownAndSuspend() throws InterruptedException { 628 try { 629 // don't consume the queue anymore 630 queuing.setActive(queueId, false); 631 // suspend all running work 632 for (Work work : running) { 633 work.setWorkInstanceSuspending(); 634 log.trace("suspending and rescheduling " + work.getId()); 635 work.setWorkInstanceState(State.SCHEDULED); 636 queuing.workReschedule(queueId, work); 637 } 638 shutdownNow(); 639 } finally { 640 executors.remove(queueId); 641 } 642 } 643 644 public void removeScheduled(String workId) { 645 queuing.removeScheduled(queueId, workId); 646 } 647 648 } 649 650 @Override 651 public void schedule(Work work) { 652 schedule(work, Scheduling.ENQUEUE, false); 653 } 654 655 @Override 656 public void schedule(Work work, boolean afterCommit) { 657 schedule(work, Scheduling.ENQUEUE, afterCommit); 658 } 659 660 @Override 661 public void schedule(Work work, Scheduling scheduling) { 662 schedule(work, scheduling, false); 663 } 664 665 @Override 666 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 667 String workId = work.getId(); 668 String queueId = getCategoryQueueId(work.getCategory()); 669 if (!isQueuingEnabled(queueId)) { 670 return; 671 } 672 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 673 return; 674 } 675 work.setWorkInstanceState(State.SCHEDULED); 676 WorkSchedulePath.newInstance(work); 677 switch (scheduling) { 678 case ENQUEUE: 679 break; 680 case CANCEL_SCHEDULED: 681 getExecutor(queueId).removeScheduled(workId); 682 break; 683 case IF_NOT_SCHEDULED: 684 case IF_NOT_RUNNING_OR_SCHEDULED: 685 // TODO disabled for now because hasWorkInState uses isScheduled 686 // which is buggy 687 boolean disabled = Boolean.TRUE.booleanValue(); 688 if (!disabled && hasWorkInState(workId, scheduling.state)) { 689 if (log.isDebugEnabled()) { 690 log.debug("Canceling schedule because found: " + scheduling); 691 } 692 return; 693 694 } 695 break; 696 697 } 698 queuing.workSchedule(queueId, work); 699 } 700 701 /** 702 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 703 * 704 * @since 5.8 705 */ 706 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 707 TransactionManager transactionManager; 708 try { 709 transactionManager = TransactionHelper.lookupTransactionManager(); 710 } catch (NamingException e) { 711 transactionManager = null; 712 } 713 if (transactionManager == null) { 714 if (log.isDebugEnabled()) { 715 log.debug("Not scheduling work after commit because of missing transaction manager: " + work); 716 } 717 return false; 718 } 719 try { 720 Transaction transaction = transactionManager.getTransaction(); 721 if (transaction == null) { 722 if (log.isDebugEnabled()) { 723 log.debug("Not scheduling work after commit because of missing transaction: " + work); 724 } 725 return false; 726 } 727 int status = transaction.getStatus(); 728 if (status == Status.STATUS_ACTIVE) { 729 if (log.isDebugEnabled()) { 730 log.debug("Scheduling work after commit: " + work); 731 } 732 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 733 return true; 734 } else if (status == Status.STATUS_COMMITTED) { 735 // called in afterCompletion, we can schedule immediately 736 if (log.isDebugEnabled()) { 737 log.debug("Scheduling work immediately: " + work); 738 } 739 return false; 740 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 741 if (log.isDebugEnabled()) { 742 log.debug("Cancelling schedule because transaction marked rollback-only: " + work); 743 } 744 return true; 745 } else { 746 if (log.isDebugEnabled()) { 747 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 748 + work); 749 } 750 return false; 751 } 752 } catch (SystemException | RollbackException e) { 753 log.error("Cannot schedule after commit", e); 754 return false; 755 } 756 } 757 758 @Override 759 public Work find(String workId, State state) { 760 return queuing.find(workId, state); 761 } 762 763 /** 764 * @param state SCHEDULED, RUNNING or null for both 765 */ 766 protected boolean hasWorkInState(String workId, State state) { 767 return queuing.isWorkInState(workId, state); 768 } 769 770 @Override 771 public State getWorkState(String workId) { 772 return queuing.getWorkState(workId); 773 } 774 775 @Override 776 public List<Work> listWork(String queueId, State state) { 777 // don't return scheduled after commit 778 return queuing.listWork(queueId, state); 779 } 780 781 @Override 782 public List<String> listWorkIds(String queueId, State state) { 783 return queuing.listWorkIds(queueId, state); 784 } 785 786 @Override 787 public WorkQueueMetrics getMetrics(String queueId) { 788 return queuing.metrics(queueId); 789 } 790 791 @Override 792 public int getQueueSize(String queueId, State state) { 793 WorkQueueMetrics metrics = getMetrics(queueId); 794 if (state == null) { 795 return metrics.scheduled.intValue() + metrics.running.intValue(); 796 } 797 if (state == State.SCHEDULED) { 798 return metrics.scheduled.intValue(); 799 } else if (state == State.RUNNING) { 800 return metrics.running.intValue(); 801 } else { 802 throw new IllegalArgumentException(String.valueOf(state)); 803 } 804 } 805 806 @Override 807 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 808 return awaitCompletion(null, duration, unit); 809 } 810 811 @Override 812 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 813 if (!isStarted()) { 814 return true; 815 } 816 SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId)); 817 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 818 long deadline = getTimestampAfter(durationInMs); 819 int pause = (int) Math.min(durationInMs, 500L); 820 log.debug("awaitForCompletion " + durationInMs + " ms"); 821 do { 822 if (noScheduledOrRunningWork(queueId)) { 823 completionSynchronizer.signalCompletedWork(); 824 SequenceTracer.stop("done"); 825 return true; 826 } 827 completionSynchronizer.waitForCompletedWork(pause); 828 } while (System.currentTimeMillis() < deadline); 829 log.info("awaitCompletion timeout after " + durationInMs + " ms"); 830 SequenceTracer.destroy("timeout after " + durationInMs + " ms"); 831 return false; 832 } 833 834 protected long getTimestampAfter(long durationInMs) { 835 long ret = System.currentTimeMillis() + durationInMs; 836 if (ret < 0) { 837 ret = Long.MAX_VALUE; 838 } 839 return ret; 840 } 841 842 protected boolean noScheduledOrRunningWork(String queueId) { 843 if (queueId == null) { 844 for (String id : getWorkQueueIds()) { 845 if (!noScheduledOrRunningWork(id)) { 846 return false; 847 } 848 } 849 return true; 850 } 851 if (!isProcessingEnabled(queueId)) { 852 return getExecutor(queueId).runningCount.getCount() == 0L; 853 } 854 if (getQueueSize(queueId, null) > 0) { 855 if (log.isTraceEnabled()) { 856 log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + ", running: " 857 + getQueueSize(queueId, State.RUNNING)); 858 } 859 return false; 860 } 861 if (log.isTraceEnabled()) { 862 log.trace(queueId + " is completed"); 863 } 864 return true; 865 } 866 867 @Override 868 public boolean isStarted() { 869 return started && !shutdownInProgress; 870 } 871 872}