001/* 002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.RejectedExecutionException; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036 037import javax.naming.NamingException; 038import javax.transaction.RollbackException; 039import javax.transaction.Status; 040import javax.transaction.Synchronization; 041import javax.transaction.SystemException; 042import javax.transaction.Transaction; 043import javax.transaction.TransactionManager; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.common.logging.SequenceTracer; 048import org.nuxeo.ecm.core.event.EventServiceComponent; 049import org.nuxeo.ecm.core.work.WorkQueuing.Listener; 050import org.nuxeo.ecm.core.work.api.Work; 051import org.nuxeo.ecm.core.work.api.Work.State; 052import org.nuxeo.ecm.core.work.api.WorkManager; 053import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 054import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 055import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor; 056import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 057import org.nuxeo.runtime.RuntimeServiceEvent; 058import org.nuxeo.runtime.RuntimeServiceListener; 059import org.nuxeo.runtime.api.Framework; 060import org.nuxeo.runtime.metrics.MetricsService; 061import org.nuxeo.runtime.model.ComponentContext; 062import org.nuxeo.runtime.model.ComponentInstance; 063import org.nuxeo.runtime.model.DefaultComponent; 064import org.nuxeo.runtime.transaction.TransactionHelper; 065 066import com.codahale.metrics.Counter; 067import com.codahale.metrics.MetricRegistry; 068import com.codahale.metrics.SharedMetricRegistries; 069import com.codahale.metrics.Timer; 070 071/** 072 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 073 * implementation. 074 * 075 * @since 5.6 076 */ 077public class WorkManagerImpl extends DefaultComponent implements WorkManager { 078 079 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 080 081 private static final Log log = LogFactory.getLog(WorkManagerImpl.class); 082 083 protected static final String QUEUES_EP = "queues"; 084 085 protected static final String IMPL_EP = "implementation"; 086 087 public static final String DEFAULT_QUEUE_ID = "default"; 088 089 public static final String DEFAULT_CATEGORY = "default"; 090 091 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 092 093 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 094 095 // @GuardedBy("itself") 096 protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry(); 097 098 protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry(); 099 100 // used synchronized 101 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 102 103 protected WorkQueuing queuing; 104 105 /** 106 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another 107 * node in cluster mode. 108 */ 109 protected class WorkCompletionSynchronizer { 110 final protected ReentrantLock lock = new ReentrantLock(); 111 112 final protected Condition condition = lock.newCondition(); 113 114 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 115 lock.lock(); 116 try { 117 return condition.await(timeMs, TimeUnit.MILLISECONDS); 118 } finally { 119 lock.unlock(); 120 } 121 } 122 123 protected void signalCompletedWork() { 124 lock.lock(); 125 try { 126 condition.signalAll(); 127 } finally { 128 lock.unlock(); 129 } 130 } 131 132 } 133 134 protected WorkCompletionSynchronizer completionSynchronizer; 135 136 @Override 137 public void activate(ComponentContext context) { 138 Framework.addListener(new ShutdownListener()); 139 } 140 141 @Override 142 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 143 if (QUEUES_EP.equals(extensionPoint)) { 144 registerWorkQueueDescriptor((WorkQueueDescriptor) contribution); 145 } else if (IMPL_EP.equals(extensionPoint)) { 146 registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 147 } else { 148 throw new RuntimeException("Unknown extension point: " + extensionPoint); 149 } 150 } 151 152 @Override 153 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 154 if (QUEUES_EP.equals(extensionPoint)) { 155 unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution); 156 } else if (IMPL_EP.equals(extensionPoint)) { 157 unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 158 } else { 159 throw new RuntimeException("Unknown extension point: " + extensionPoint); 160 } 161 } 162 163 void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 164 String queueId = workQueueDescriptor.id; 165 if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) { 166 Boolean processing = workQueueDescriptor.processing; 167 if (processing == null) { 168 log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES 169 + " with no processing/queuing"); 170 return; 171 } 172 String what = processing == null ? "" : (" processing=" + processing); 173 what += queuing == null ? "" : (" queuing=" + queuing); 174 log.info("Setting on all work queues:" + what); 175 // activate/deactivate processing/queuing on all queues 176 List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy 177 for (String id : queueIds) { 178 // add an updated contribution redefining processing/queuing 179 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 180 wqd.id = id; 181 wqd.processing = processing; 182 registerWorkQueueDescriptor(wqd); 183 } 184 return; 185 } 186 workQueueConfig.addContribution(workQueueDescriptor); 187 WorkQueueDescriptor wqd = workQueueConfig.get(queueId); 188 log.info("Registered work queue " + queueId + " " + wqd.toString()); 189 } 190 191 void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 192 String id = workQueueDescriptor.id; 193 if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) { 194 return; 195 } 196 workQueueConfig.removeContribution(workQueueDescriptor); 197 log.info("Unregistered work queue " + id); 198 } 199 200 void activateQueue(WorkQueueDescriptor config) { 201 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 202 return; 203 } 204 NuxeoBlockingQueue queue = queuing.getQueue(config.id); 205 if (queue == null) { 206 queue = queuing.init(config); 207 } 208 WorkThreadPoolExecutor executor = executors.get(config.id); 209 if (executor == null) { 210 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-"); 211 int maxPoolSize = config.getMaxThreads(); 212 executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 213 0, TimeUnit.SECONDS, 214 queue, threadFactory); 215 // prestart all core threads so that direct additions to the queue 216 // (from another Nuxeo instance) can be seen 217 executor.prestartAllCoreThreads(); 218 executors.put(config.id, executor); 219 } 220 queuing.setActive(config.id, config.isProcessingEnabled()); 221 log.info("Activated work queue " + config.id + " " + config.toEffectiveString()); 222 } 223 224 void deactivateQueue(WorkQueueDescriptor config) { 225 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 226 return; 227 } 228 WorkThreadPoolExecutor executor = executors.get(config.id); 229 try { 230 executor.shutdownAndSuspend(); 231 } catch (InterruptedException cause) { 232 Thread.currentThread().interrupt(); 233 throw new RuntimeException("Interrupted while deactivating queue " + config.id, cause); 234 } 235 log.info("Deactivated work queue " + config.id); 236 } 237 238 void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 239 workQueuingConfig.addContribution(descr); 240 } 241 242 void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 243 workQueuingConfig.removeContribution(descr); 244 } 245 246 protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) { 247 try { 248 return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener()); 249 } catch (ReflectiveOperationException | SecurityException e) { 250 throw new RuntimeException(e); 251 } 252 } 253 254 @Override 255 public boolean isQueuingEnabled(String queueId) { 256 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 257 return wqd == null ? false : wqd.isQueuingEnabled(); 258 } 259 260 @Override 261 public void enableProcessing(boolean value) { 262 for (String queueId : workQueueConfig.getQueueIds()) { 263 queuing.getQueue(queueId).setActive(value); 264 } 265 } 266 267 @Override 268 public void enableProcessing(String queueId, boolean value) throws InterruptedException { 269 WorkQueueDescriptor config = workQueueConfig.get(queueId); 270 if (config == null) { 271 throw new IllegalArgumentException("no such queue " + queueId); 272 } 273 if (!value) { 274 deactivateQueue(config); 275 } else { 276 activateQueue(config); 277 } 278 } 279 280 @Override 281 public boolean isProcessingEnabled() { 282 for (String queueId : workQueueConfig.getQueueIds()) { 283 if (queuing.getQueue(queueId).active) { 284 return true; 285 } 286 } 287 return false; 288 } 289 290 @Override 291 public boolean isProcessingEnabled(String queueId) { 292 if (queueId == null) { 293 return isProcessingEnabled(); 294 } 295 return queuing.getQueue(queueId).active; 296 } 297 298 // ----- WorkManager ----- 299 300 @Override 301 public List<String> getWorkQueueIds() { 302 synchronized (workQueueConfig) { 303 return workQueueConfig.getQueueIds(); 304 } 305 } 306 307 @Override 308 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 309 synchronized (workQueueConfig) { 310 return workQueueConfig.get(queueId); 311 } 312 } 313 314 @Override 315 public String getCategoryQueueId(String category) { 316 if (category == null) { 317 category = DEFAULT_CATEGORY; 318 } 319 String queueId = workQueueConfig.getQueueId(category); 320 if (queueId == null) { 321 queueId = DEFAULT_QUEUE_ID; 322 } 323 return queueId; 324 } 325 326 @Override 327 public int getApplicationStartedOrder() { 328 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 329 } 330 331 @Override 332 public void applicationStarted(ComponentContext context) { 333 init(); 334 } 335 336 protected volatile boolean started = false; 337 338 protected volatile boolean shutdownInProgress = false; 339 340 @Override 341 public void init() { 342 if (started) { 343 return; 344 } 345 synchronized (this) { 346 if (started) { 347 return; 348 } 349 queuing = newWorkQueuing(workQueuingConfig.klass); 350 completionSynchronizer = new WorkCompletionSynchronizer(); 351 started = true; 352 workQueueConfig.index(); 353 for (String id : workQueueConfig.getQueueIds()) { 354 activateQueue(workQueueConfig.get(id)); 355 } 356 } 357 } 358 359 protected WorkThreadPoolExecutor getExecutor(String queueId) { 360 if (!started) { 361 if (Framework.isTestModeSet() && !Framework.getRuntime() 362 .isShuttingDown()) { 363 LogFactory.getLog(WorkManagerImpl.class) 364 .warn("Lazy starting of work manager in test mode"); 365 init(); 366 } else { 367 throw new IllegalStateException("Work manager not started, could not access to executors"); 368 } 369 } 370 WorkQueueDescriptor workQueueDescriptor; 371 synchronized (workQueueConfig) { 372 workQueueDescriptor = workQueueConfig.get(queueId); 373 } 374 if (workQueueDescriptor == null) { 375 throw new IllegalArgumentException("No such work queue: " + queueId); 376 } 377 378 return executors.get(queueId); 379 } 380 381 @Override 382 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 383 WorkThreadPoolExecutor executor = getExecutor(queueId); 384 return shutdownExecutors(Collections.singleton(executor), timeout, unit); 385 } 386 387 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 388 throws InterruptedException { 389 // mark executors as shutting down 390 for (WorkThreadPoolExecutor executor : list) { 391 executor.shutdownAndSuspend(); 392 } 393 timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); 394 // wait until threads termination 395 for (WorkThreadPoolExecutor executor : list) { 396 long t0 = System.currentTimeMillis(); 397 if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { 398 return false; 399 } 400 timeout -= unit.convert(System.currentTimeMillis() - t0, TimeUnit.MILLISECONDS); 401 } 402 return true; 403 } 404 405 protected long remainingMillis(long t0, long delay) { 406 long d = System.currentTimeMillis() - t0; 407 if (d > delay) { 408 return 0; 409 } 410 return delay - d; 411 } 412 413 protected synchronized void removeExecutor(String queueId) { 414 executors.remove(queueId); 415 } 416 417 @Override 418 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 419 shutdownInProgress = true; 420 try { 421 return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit); 422 } finally { 423 shutdownInProgress = false; 424 started = false; 425 } 426 } 427 428 protected class ShutdownListener implements RuntimeServiceListener { 429 @Override 430 public void handleEvent(RuntimeServiceEvent event) { 431 if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) { 432 return; 433 } 434 Framework.removeListener(this); 435 try { 436 if (!shutdown(10, TimeUnit.SECONDS)) { 437 log.error("Some processors are still active"); 438 } 439 } catch (InterruptedException cause) { 440 Thread.currentThread().interrupt(); 441 log.error("Interrupted during works manager shutdown, continuing runtime shutdown", cause); 442 } 443 } 444 } 445 446 /** 447 * A work instance and how to schedule it, for schedule-after-commit. 448 * 449 * @since 5.8 450 */ 451 public class WorkScheduling implements Synchronization { 452 public final Work work; 453 454 public final Scheduling scheduling; 455 456 public WorkScheduling(Work work, Scheduling scheduling) { 457 this.work = work; 458 this.scheduling = scheduling; 459 } 460 461 @Override 462 public void beforeCompletion() { 463 } 464 465 @Override 466 public void afterCompletion(int status) { 467 if (status == Status.STATUS_COMMITTED) { 468 schedule(work, scheduling, false); 469 } else if (status == Status.STATUS_ROLLEDBACK) { 470 work.setWorkInstanceState(State.UNKNOWN); 471 } else { 472 throw new IllegalArgumentException("Unsupported transaction status " + status); 473 } 474 } 475 } 476 477 /** 478 * Creates non-daemon threads at normal priority. 479 */ 480 private static class NamedThreadFactory implements ThreadFactory { 481 482 private final AtomicInteger threadNumber = new AtomicInteger(); 483 484 private final ThreadGroup group; 485 486 private final String prefix; 487 488 public NamedThreadFactory(String prefix) { 489 SecurityManager sm = System.getSecurityManager(); 490 group = sm == null ? Thread.currentThread() 491 .getThreadGroup() : sm.getThreadGroup(); 492 this.prefix = prefix; 493 } 494 495 @Override 496 public Thread newThread(Runnable r) { 497 String name = prefix + threadNumber.incrementAndGet(); 498 Thread thread = new Thread(group, r, name); 499 // do not set daemon 500 thread.setPriority(Thread.NORM_PRIORITY); 501 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 502 503 @Override 504 public void uncaughtException(Thread t, Throwable e) { 505 LogFactory.getLog(WorkManagerImpl.class) 506 .error("Uncaught error on thread " + t.getName(), e); 507 } 508 }); 509 return thread; 510 } 511 } 512 513 /** 514 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 515 * <p> 516 * Completed tasks are passed to another queue. 517 * <p> 518 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 519 * persisted, etc). 520 * 521 * @since 5.6 522 */ 523 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 524 525 protected final String queueId; 526 527 /** 528 * List of running Work instances, in order to be able to interrupt them if requested. 529 */ 530 // @GuardedBy("itself") 531 protected final ConcurrentLinkedQueue<Work> running; 532 533 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 534 // Number of work scheduled by this instance 535 protected final Counter scheduledCount; 536 537 // Number of work currently running on this instance 538 protected final Counter runningCount; 539 540 // Number of work completed by this instance 541 protected final Counter completedCount; 542 543 protected final Timer workTimer; 544 545 546 protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 547 TimeUnit unit, NuxeoBlockingQueue queue, ThreadFactory threadFactory) { 548 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory); 549 queueId = queue.queueId; 550 running = new ConcurrentLinkedQueue<Work>(); 551 // init metrics 552 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 553 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 554 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 555 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 556 } 557 558 public int getScheduledOrRunningSize() { 559 int ret = 0; 560 for (String queueId : getWorkQueueIds()) { 561 ret += getQueueSize(queueId, null); 562 } 563 return ret; 564 } 565 566 @Override 567 public void execute(Runnable r) { 568 throw new UnsupportedOperationException("use other api"); 569 } 570 571 /** 572 * Executes the given task sometime in the future. 573 * 574 * @param work the work to execute 575 * @see #execute(Runnable) 576 */ 577 public void execute(Work work) { 578 scheduledCount.inc(); 579 submit(work); 580 } 581 582 /** 583 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 584 * directly 585 * 586 * @param work 587 * @throws RuntimeException 588 */ 589 protected void submit(Work work) throws RuntimeException { 590 queuing.workSchedule(queueId, work); 591 } 592 593 @Override 594 protected void beforeExecute(Thread t, Runnable r) { 595 Work work = WorkHolder.getWork(r); 596 if (isShutdown()) { 597 work.setWorkInstanceState(State.SCHEDULED); 598 queuing.workReschedule(queueId, work); 599 throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work); 600 } 601 work.setWorkInstanceState(State.RUNNING); 602 queuing.workRunning(queueId, work); 603 running.add(work); 604 runningCount.inc(); 605 } 606 607 @Override 608 protected void afterExecute(Runnable r, Throwable t) { 609 Work work = WorkHolder.getWork(r); 610 try { 611 if (work.isSuspending()) { 612 log.trace(work + " is suspending, giving up"); 613 return; 614 } 615 if (isShutdown()) { 616 log.trace("rescheduling " + work.getId(), t); 617 work.setWorkInstanceState(State.SCHEDULED); 618 queuing.workReschedule(queueId, work); 619 return; 620 } 621 work.setWorkInstanceState(State.UNKNOWN); 622 queuing.workCompleted(queueId, work); 623 } finally { 624 running.remove(work); 625 runningCount.dec(); 626 completedCount.inc(); 627 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 628 completionSynchronizer.signalCompletedWork(); 629 } 630 } 631 632 633 /** 634 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. 635 * @throws InterruptedException 636 */ 637 public void shutdownAndSuspend() throws InterruptedException { 638 try { 639 // don't consume the queue anymore 640 queuing.setActive(queueId, false); 641 // suspend all running work 642 for (Work work : running) { 643 work.setWorkInstanceSuspending(); 644 log.trace("suspending and rescheduling " + work.getId()); 645 work.setWorkInstanceState(State.SCHEDULED); 646 queuing.workReschedule(queueId, work); 647 } 648 shutdownNow(); 649 } finally { 650 executors.remove(queueId); 651 } 652 } 653 654 public void removeScheduled(String workId) { 655 queuing.removeScheduled(queueId, workId); 656 } 657 658 } 659 660 @Override 661 public void schedule(Work work) { 662 schedule(work, Scheduling.ENQUEUE, false); 663 } 664 665 @Override 666 public void schedule(Work work, boolean afterCommit) { 667 schedule(work, Scheduling.ENQUEUE, afterCommit); 668 } 669 670 @Override 671 public void schedule(Work work, Scheduling scheduling) { 672 schedule(work, scheduling, false); 673 } 674 675 @Override 676 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 677 String workId = work.getId(); 678 String queueId = getCategoryQueueId(work.getCategory()); 679 if (!isQueuingEnabled(queueId)) { 680 return; 681 } 682 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 683 return; 684 } 685 work.setWorkInstanceState(State.SCHEDULED); 686 WorkSchedulePath.newInstance(work); 687 switch (scheduling) { 688 case ENQUEUE: 689 break; 690 case CANCEL_SCHEDULED: 691 getExecutor(queueId).removeScheduled(workId); 692 break; 693 case IF_NOT_SCHEDULED: 694 case IF_NOT_RUNNING: 695 case IF_NOT_RUNNING_OR_SCHEDULED: 696 // TODO disabled for now because hasWorkInState uses isScheduled 697 // which is buggy 698 boolean disabled = Boolean.TRUE.booleanValue(); 699 if (!disabled && hasWorkInState(workId, scheduling.state)) { 700 if (log.isDebugEnabled()) { 701 log.debug("Canceling schedule because found: " + scheduling); 702 } 703 return; 704 705 } 706 break; 707 708 } 709 queuing.workSchedule(queueId, work); 710 } 711 712 /** 713 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 714 * 715 * @since 5.8 716 */ 717 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 718 TransactionManager transactionManager; 719 try { 720 transactionManager = TransactionHelper.lookupTransactionManager(); 721 } catch (NamingException e) { 722 transactionManager = null; 723 } 724 if (transactionManager == null) { 725 if (log.isDebugEnabled()) { 726 log.debug("Not scheduling work after commit because of missing transaction manager: " + work); 727 } 728 return false; 729 } 730 try { 731 Transaction transaction = transactionManager.getTransaction(); 732 if (transaction == null) { 733 if (log.isDebugEnabled()) { 734 log.debug("Not scheduling work after commit because of missing transaction: " + work); 735 } 736 return false; 737 } 738 int status = transaction.getStatus(); 739 if (status == Status.STATUS_ACTIVE) { 740 if (log.isDebugEnabled()) { 741 log.debug("Scheduling work after commit: " + work); 742 } 743 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 744 return true; 745 } else if (status == Status.STATUS_COMMITTED) { 746 // called in afterCompletion, we can schedule immediately 747 if (log.isDebugEnabled()) { 748 log.debug("Scheduling work immediately: " + work); 749 } 750 return false; 751 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 752 if (log.isDebugEnabled()) { 753 log.debug("Cancelling schedule because transaction marked rollback-only: " + work); 754 } 755 return true; 756 } else { 757 if (log.isDebugEnabled()) { 758 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 759 + work); 760 } 761 return false; 762 } 763 } catch (SystemException | RollbackException e) { 764 log.error("Cannot schedule after commit", e); 765 return false; 766 } 767 } 768 769 @Override 770 @Deprecated 771 public Work find(Work work, State state, boolean useEquals, int[] pos) { 772 if (pos != null) { 773 pos[0] = 0; // compat 774 } 775 String workId = work.getId(); 776 return queuing.find(workId, state); 777 } 778 779 @Override 780 public Work find(String workId, State state) { 781 return queuing.find(workId, state); 782 } 783 784 /** 785 * @param state SCHEDULED, RUNNING or null for both 786 */ 787 protected boolean hasWorkInState(String workId, State state) { 788 return queuing.isWorkInState(workId, state); 789 } 790 791 @Override 792 public State getWorkState(String workId) { 793 return queuing.getWorkState(workId); 794 } 795 796 @Override 797 public List<Work> listWork(String queueId, State state) { 798 // don't return scheduled after commit 799 return queuing.listWork(queueId, state); 800 } 801 802 @Override 803 public List<String> listWorkIds(String queueId, State state) { 804 return queuing.listWorkIds(queueId, state); 805 } 806 807 @Override 808 public WorkQueueMetrics getMetrics(String queueId) { 809 return queuing.metrics(queueId); 810 } 811 812 @Override 813 public int getQueueSize(String queueId, State state) { 814 WorkQueueMetrics metrics = getMetrics(queueId); 815 if (state == null) { 816 return metrics.scheduled.intValue() + metrics.running.intValue(); 817 } 818 if (state == State.SCHEDULED) { 819 return metrics.scheduled.intValue(); 820 } else if (state == State.RUNNING) { 821 return metrics.running.intValue(); 822 } else { 823 throw new IllegalArgumentException(String.valueOf(state)); 824 } 825 } 826 827 @Override 828 @Deprecated 829 public int getNonCompletedWorkSize(String queueId) { 830 return getQueueSize(queueId, null); 831 } 832 833 @Override 834 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 835 return awaitCompletion(null, duration, unit); 836 } 837 838 @Override 839 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 840 if (!isStarted()) { 841 return true; 842 } 843 SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId)); 844 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 845 long deadline = getTimestampAfter(durationInMs); 846 int pause = (int) Math.min(duration, 500L); 847 log.debug("awaitForCompletion " + durationInMs + " ms"); 848 do { 849 if (noScheduledOrRunningWork(queueId)) { 850 completionSynchronizer.signalCompletedWork(); 851 SequenceTracer.stop("done"); 852 return true; 853 } 854 completionSynchronizer.waitForCompletedWork(pause); 855 } while (System.currentTimeMillis() < deadline); 856 log.info("awaitCompletion timeout after " + durationInMs + " ms"); 857 SequenceTracer.destroy("timeout after " + durationInMs + " ms"); 858 return false; 859 } 860 861 protected long getTimestampAfter(long durationInMs) { 862 long ret = System.currentTimeMillis() + durationInMs; 863 if (ret < 0) { 864 ret = Long.MAX_VALUE; 865 } 866 return ret; 867 } 868 869 protected boolean noScheduledOrRunningWork(String queueId) { 870 if (queueId == null) { 871 for (String id : getWorkQueueIds()) { 872 if (!noScheduledOrRunningWork(id)) { 873 return false; 874 } 875 } 876 return true; 877 } 878 if (!isProcessingEnabled(queueId)) { 879 return getExecutor(queueId).runningCount.getCount() == 0L; 880 } 881 boolean ret = getQueueSize(queueId, null) == 0; 882 if (ret == false) { 883 if (log.isTraceEnabled()) { 884 log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + 885 ", running: " + getQueueSize(queueId, State.RUNNING)); 886 } 887 return false; 888 } 889 if (log.isTraceEnabled()) { 890 log.trace(queueId + " is completed"); 891 } 892 return true; 893 } 894 895 @Override 896 public boolean isStarted() { 897 return started && !shutdownInProgress; 898 } 899 900}