001/* 002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.locks.Condition; 034import java.util.concurrent.locks.ReentrantLock; 035 036import javax.naming.NamingException; 037import javax.transaction.RollbackException; 038import javax.transaction.Status; 039import javax.transaction.Synchronization; 040import javax.transaction.SystemException; 041import javax.transaction.Transaction; 042import javax.transaction.TransactionManager; 043 044import org.apache.commons.logging.Log; 045import org.apache.commons.logging.LogFactory; 046import org.nuxeo.common.logging.SequenceTracer; 047import org.nuxeo.ecm.core.event.EventServiceComponent; 048import org.nuxeo.ecm.core.work.WorkQueuing.Listener; 049import org.nuxeo.ecm.core.work.api.Work; 050import org.nuxeo.ecm.core.work.api.Work.State; 051import org.nuxeo.ecm.core.work.api.WorkManager; 052import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 053import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 054import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor; 055import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 056import org.nuxeo.runtime.RuntimeServiceEvent; 057import org.nuxeo.runtime.RuntimeServiceListener; 058import org.nuxeo.runtime.api.Framework; 059import org.nuxeo.runtime.metrics.MetricsService; 060import org.nuxeo.runtime.model.ComponentContext; 061import org.nuxeo.runtime.model.ComponentInstance; 062import org.nuxeo.runtime.model.DefaultComponent; 063import org.nuxeo.runtime.transaction.TransactionHelper; 064 065import com.codahale.metrics.Counter; 066import com.codahale.metrics.MetricRegistry; 067import com.codahale.metrics.SharedMetricRegistries; 068import com.codahale.metrics.Timer; 069 070/** 071 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 072 * implementation. 073 * 074 * @since 5.6 075 */ 076public class WorkManagerImpl extends DefaultComponent implements WorkManager { 077 078 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 079 080 private static final Log log = LogFactory.getLog(WorkManagerImpl.class); 081 082 protected static final String QUEUES_EP = "queues"; 083 084 protected static final String IMPL_EP = "implementation"; 085 086 public static final String DEFAULT_QUEUE_ID = "default"; 087 088 public static final String DEFAULT_CATEGORY = "default"; 089 090 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 091 092 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 093 094 // @GuardedBy("itself") 095 protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry(); 096 097 protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry(); 098 099 // used synchronized 100 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 101 102 protected WorkQueuing queuing; 103 104 /** 105 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another 106 * node in cluster mode. 107 */ 108 protected class WorkCompletionSynchronizer { 109 final protected ReentrantLock lock = new ReentrantLock(); 110 111 final protected Condition condition = lock.newCondition(); 112 113 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 114 lock.lock(); 115 try { 116 return condition.await(timeMs, TimeUnit.MILLISECONDS); 117 } finally { 118 lock.unlock(); 119 } 120 } 121 122 protected void signalCompletedWork() { 123 lock.lock(); 124 try { 125 condition.signalAll(); 126 } finally { 127 lock.unlock(); 128 } 129 } 130 131 } 132 133 protected WorkCompletionSynchronizer completionSynchronizer; 134 135 @Override 136 public void activate(ComponentContext context) { 137 Framework.addListener(new ShutdownListener()); 138 } 139 140 @Override 141 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 142 if (QUEUES_EP.equals(extensionPoint)) { 143 registerWorkQueueDescriptor((WorkQueueDescriptor) contribution); 144 } else if (IMPL_EP.equals(extensionPoint)) { 145 registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 146 } else { 147 throw new RuntimeException("Unknown extension point: " + extensionPoint); 148 } 149 } 150 151 @Override 152 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 153 if (QUEUES_EP.equals(extensionPoint)) { 154 unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution); 155 } else if (IMPL_EP.equals(extensionPoint)) { 156 unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 157 } else { 158 throw new RuntimeException("Unknown extension point: " + extensionPoint); 159 } 160 } 161 162 void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 163 String queueId = workQueueDescriptor.id; 164 if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) { 165 Boolean processing = workQueueDescriptor.processing; 166 if (processing == null) { 167 log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES 168 + " with no processing/queuing"); 169 return; 170 } 171 String what = processing == null ? "" : (" processing=" + processing); 172 what += queuing == null ? "" : (" queuing=" + queuing); 173 log.info("Setting on all work queues:" + what); 174 // activate/deactivate processing/queuing on all queues 175 List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy 176 for (String id : queueIds) { 177 // add an updated contribution redefining processing/queuing 178 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 179 wqd.id = id; 180 wqd.processing = processing; 181 registerWorkQueueDescriptor(wqd); 182 } 183 return; 184 } 185 workQueueConfig.addContribution(workQueueDescriptor); 186 WorkQueueDescriptor wqd = workQueueConfig.get(queueId); 187 log.info("Registered work queue " + queueId + " " + wqd.toString()); 188 } 189 190 void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 191 String id = workQueueDescriptor.id; 192 if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) { 193 return; 194 } 195 workQueueConfig.removeContribution(workQueueDescriptor); 196 log.info("Unregistered work queue " + id); 197 } 198 199 void activateQueue(WorkQueueDescriptor config) { 200 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 201 return; 202 } 203 NuxeoBlockingQueue queue = queuing.getQueue(config.id); 204 if (queue == null) { 205 queue = queuing.init(config); 206 } 207 WorkThreadPoolExecutor executor = executors.get(config.id); 208 if (executor == null) { 209 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-"); 210 int maxPoolSize = config.getMaxThreads(); 211 executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 212 0, TimeUnit.SECONDS, 213 queue, threadFactory); 214 // prestart all core threads so that direct additions to the queue 215 // (from another Nuxeo instance) can be seen 216 executor.prestartAllCoreThreads(); 217 executors.put(config.id, executor); 218 } 219 queuing.setActive(config.id, config.isProcessingEnabled()); 220 log.info("Activated work queue " + config.id + " " + config.toEffectiveString()); 221 } 222 223 void deactivateQueue(WorkQueueDescriptor config) { 224 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 225 return; 226 } 227 queuing.setActive(config.id, false); 228 WorkThreadPoolExecutor executor = executors.remove(config.id); 229 executor.shutdownAndSuspend(); 230 log.info("Deactivated work queue " + config.id); 231 } 232 233 void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 234 workQueuingConfig.addContribution(descr); 235 } 236 237 void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 238 workQueuingConfig.removeContribution(descr); 239 } 240 241 protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) { 242 try { 243 return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener()); 244 } catch (ReflectiveOperationException | SecurityException e) { 245 throw new RuntimeException(e); 246 } 247 } 248 249 @Override 250 public boolean isQueuingEnabled(String queueId) { 251 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 252 return wqd == null ? false : wqd.isQueuingEnabled(); 253 } 254 255 @Override 256 public void enableProcessing(boolean value) { 257 for (String queueId : workQueueConfig.getQueueIds()) { 258 queuing.getQueue(queueId).setActive(value); 259 } 260 } 261 262 @Override 263 public void enableProcessing(String queueId, boolean value) throws InterruptedException { 264 WorkQueueDescriptor config = workQueueConfig.get(queueId); 265 if (config == null) { 266 throw new IllegalArgumentException("no such queue " + queueId); 267 } 268 if (!value) { 269 deactivateQueue(config); 270 } else { 271 activateQueue(config); 272 } 273 } 274 275 @Override 276 public boolean isProcessingEnabled() { 277 for (String queueId : workQueueConfig.getQueueIds()) { 278 if (queuing.getQueue(queueId).active) { 279 return true; 280 } 281 } 282 return false; 283 } 284 285 @Override 286 public boolean isProcessingEnabled(String queueId) { 287 return queuing.getQueue(queueId).active; 288 } 289 290 // ----- WorkManager ----- 291 292 @Override 293 public List<String> getWorkQueueIds() { 294 synchronized (workQueueConfig) { 295 return workQueueConfig.getQueueIds(); 296 } 297 } 298 299 @Override 300 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 301 synchronized (workQueueConfig) { 302 return workQueueConfig.get(queueId); 303 } 304 } 305 306 @Override 307 public String getCategoryQueueId(String category) { 308 if (category == null) { 309 category = DEFAULT_CATEGORY; 310 } 311 String queueId = workQueueConfig.getQueueId(category); 312 if (queueId == null) { 313 queueId = DEFAULT_QUEUE_ID; 314 } 315 return queueId; 316 } 317 318 @Override 319 public int getApplicationStartedOrder() { 320 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 321 } 322 323 @Override 324 public void applicationStarted(ComponentContext context) { 325 init(); 326 } 327 328 protected volatile boolean started = false; 329 330 protected volatile boolean shutdownInProgress = false; 331 332 @Override 333 public void init() { 334 if (started) { 335 return; 336 } 337 synchronized (this) { 338 if (started) { 339 return; 340 } 341 queuing = newWorkQueuing(workQueuingConfig.klass); 342 completionSynchronizer = new WorkCompletionSynchronizer(); 343 started = true; 344 workQueueConfig.index(); 345 for (String id : workQueueConfig.getQueueIds()) { 346 activateQueue(workQueueConfig.get(id)); 347 } 348 } 349 } 350 351 protected WorkThreadPoolExecutor getExecutor(String queueId) { 352 if (!started) { 353 if (Framework.isTestModeSet() && !Framework.getRuntime() 354 .isShuttingDown()) { 355 LogFactory.getLog(WorkManagerImpl.class) 356 .warn("Lazy starting of work manager in test mode"); 357 init(); 358 } else { 359 throw new IllegalStateException("Work manager not started, could not access to executors"); 360 } 361 } 362 WorkQueueDescriptor workQueueDescriptor; 363 synchronized (workQueueConfig) { 364 workQueueDescriptor = workQueueConfig.get(queueId); 365 } 366 if (workQueueDescriptor == null) { 367 throw new IllegalArgumentException("No such work queue: " + queueId); 368 } 369 370 return executors.get(queueId); 371 } 372 373 @Override 374 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 375 WorkThreadPoolExecutor executor = getExecutor(queueId); 376 boolean terminated = shutdownExecutors(Collections.singleton(executor), timeout, unit); 377 removeExecutor(queueId); // start afresh 378 return terminated; 379 } 380 381 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 382 throws InterruptedException { 383 // mark executors as shutting down 384 for (WorkThreadPoolExecutor executor : list) { 385 executor.shutdownAndSuspend(); 386 } 387 timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); 388 // wait until threads termination 389 for (WorkThreadPoolExecutor executor : list) { 390 long t0 = System.currentTimeMillis(); 391 if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { 392 return false; 393 } 394 timeout -= unit.convert(System.currentTimeMillis() - t0, TimeUnit.MILLISECONDS); 395 } 396 return true; 397 } 398 399 protected long remainingMillis(long t0, long delay) { 400 long d = System.currentTimeMillis() - t0; 401 if (d > delay) { 402 return 0; 403 } 404 return delay - d; 405 } 406 407 protected synchronized void removeExecutor(String queueId) { 408 executors.remove(queueId); 409 } 410 411 @Override 412 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 413 shutdownInProgress = true; 414 try { 415 return shutdownExecutors(executors.values(), timeout, unit); 416 } finally { 417 shutdownInProgress = false; 418 started = false; 419 completionSynchronizer = null; 420 executors.clear(); 421 queuing = null; 422 } 423 } 424 425 protected class ShutdownListener implements RuntimeServiceListener { 426 @Override 427 public void handleEvent(RuntimeServiceEvent event) { 428 if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) { 429 return; 430 } 431 Framework.removeListener(this); 432 try { 433 if (!shutdown(10, TimeUnit.SECONDS)) { 434 log.error("Some processors are still active"); 435 } 436 } catch (InterruptedException cause) { 437 Thread.currentThread().interrupt(); 438 log.error("Interrupted during works manager shutdown, continuing runtime shutdown", cause); 439 } 440 } 441 } 442 443 /** 444 * A work instance and how to schedule it, for schedule-after-commit. 445 * 446 * @since 5.8 447 */ 448 public class WorkScheduling implements Synchronization { 449 public final Work work; 450 451 public final Scheduling scheduling; 452 453 public WorkScheduling(Work work, Scheduling scheduling) { 454 this.work = work; 455 this.scheduling = scheduling; 456 } 457 458 @Override 459 public void beforeCompletion() { 460 } 461 462 @Override 463 public void afterCompletion(int status) { 464 if (status == Status.STATUS_COMMITTED) { 465 schedule(work, scheduling, false); 466 } else if (status == Status.STATUS_ROLLEDBACK) { 467 work.setWorkInstanceState(State.UNKNOWN); 468 } else { 469 throw new IllegalArgumentException("Unsupported transaction status " + status); 470 } 471 } 472 } 473 474 /** 475 * Creates non-daemon threads at normal priority. 476 */ 477 private static class NamedThreadFactory implements ThreadFactory { 478 479 private final AtomicInteger threadNumber = new AtomicInteger(); 480 481 private final ThreadGroup group; 482 483 private final String prefix; 484 485 public NamedThreadFactory(String prefix) { 486 SecurityManager sm = System.getSecurityManager(); 487 group = sm == null ? Thread.currentThread() 488 .getThreadGroup() : sm.getThreadGroup(); 489 this.prefix = prefix; 490 } 491 492 @Override 493 public Thread newThread(Runnable r) { 494 String name = prefix + threadNumber.incrementAndGet(); 495 Thread thread = new Thread(group, r, name); 496 // do not set daemon 497 thread.setPriority(Thread.NORM_PRIORITY); 498 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 499 500 @Override 501 public void uncaughtException(Thread t, Throwable e) { 502 LogFactory.getLog(WorkManagerImpl.class) 503 .error("Uncaught error on thread " + t.getName(), e); 504 } 505 }); 506 return thread; 507 } 508 } 509 510 /** 511 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 512 * <p> 513 * Completed tasks are passed to another queue. 514 * <p> 515 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 516 * persisted, etc). 517 * 518 * @since 5.6 519 */ 520 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 521 522 protected final String queueId; 523 524 /** 525 * List of running Work instances, in order to be able to interrupt them if requested. 526 */ 527 // @GuardedBy("itself") 528 protected final ConcurrentLinkedQueue<Work> running; 529 530 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 531 // Number of work scheduled by this instance 532 protected final Counter scheduledCount; 533 534 // Number of work currently running on this instance 535 protected final Counter runningCount; 536 537 // Number of work completed by this instance 538 protected final Counter completedCount; 539 540 protected final Timer workTimer; 541 542 543 protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 544 TimeUnit unit, NuxeoBlockingQueue queue, ThreadFactory threadFactory) { 545 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory); 546 queueId = queue.queueId; 547 running = new ConcurrentLinkedQueue<Work>(); 548 // init metrics 549 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 550 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 551 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 552 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 553 } 554 555 public int getScheduledOrRunningSize() { 556 int ret = 0; 557 for (String queueId : getWorkQueueIds()) { 558 ret += getQueueSize(queueId, null); 559 } 560 return ret; 561 } 562 563 @Override 564 public void execute(Runnable r) { 565 throw new UnsupportedOperationException("use other api"); 566 } 567 568 /** 569 * Executes the given task sometime in the future. 570 * 571 * @param work the work to execute 572 * @see #execute(Runnable) 573 */ 574 public void execute(Work work) { 575 scheduledCount.inc(); 576 submit(work); 577 } 578 579 /** 580 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 581 * directly 582 * 583 * @param work 584 * @throws RuntimeException 585 */ 586 protected void submit(Work work) throws RuntimeException { 587 queuing.workSchedule(queueId, work); 588 } 589 590 @Override 591 protected void beforeExecute(Thread t, Runnable r) { 592 Work work = WorkHolder.getWork(r); 593 if (started == false || shutdownInProgress == true) { 594 work.setWorkInstanceState(State.SCHEDULED); 595 queuing.workSchedule(queueId, work); 596 return; 597 } 598 work.setWorkInstanceState(State.RUNNING); 599 queuing.workRunning(queueId, work); 600 running.add(work); 601 runningCount.inc(); 602 } 603 604 @Override 605 protected void afterExecute(Runnable r, Throwable t) { 606 Work work = WorkHolder.getWork(r); 607 try { 608 if (work.isSuspending()) { 609 return; 610 } 611 work.setWorkInstanceState(State.UNKNOWN); 612 queuing.workCompleted(queueId, work); 613 } finally { 614 running.remove(work); 615 runningCount.dec(); 616 completedCount.inc(); 617 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 618 completionSynchronizer.signalCompletedWork(); 619 } 620 } 621 622 // called during shutdown 623 // with tasks from the queue if new tasks are submitted 624 // or with tasks drained from the queue 625 protected void removedFromQueue(Runnable r) { 626 Work work = WorkHolder.getWork(r); 627 work.setWorkInstanceState(State.UNKNOWN); 628 completionSynchronizer.signalCompletedWork(); 629 } 630 631 /** 632 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. 633 */ 634 public void shutdownAndSuspend() { 635 // don't consume the queue anymore 636 ((NuxeoBlockingQueue) getQueue()).setActive(false); 637 // shutdown the executor 638 shutdown(); 639 // suspend and reschedule all running work 640 synchronized (running) { 641 for (Work work : running) { 642 work.setWorkInstanceSuspending(); 643 work.setWorkInstanceState(State.SCHEDULED); 644 queuing.workReschedule(queueId, work); 645 } 646 } 647 shutdownNow(); 648 } 649 650 public void removeScheduled(String workId) { 651 queuing.removeScheduled(queueId, workId); 652 } 653 654 } 655 656 @Override 657 public void schedule(Work work) { 658 schedule(work, Scheduling.ENQUEUE, false); 659 } 660 661 @Override 662 public void schedule(Work work, boolean afterCommit) { 663 schedule(work, Scheduling.ENQUEUE, afterCommit); 664 } 665 666 @Override 667 public void schedule(Work work, Scheduling scheduling) { 668 schedule(work, scheduling, false); 669 } 670 671 @Override 672 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 673 String workId = work.getId(); 674 String queueId = getCategoryQueueId(work.getCategory()); 675 if (!isQueuingEnabled(queueId)) { 676 return; 677 } 678 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 679 return; 680 } 681 work.setWorkInstanceState(State.SCHEDULED); 682 WorkSchedulePath.newInstance(work); 683 switch (scheduling) { 684 case ENQUEUE: 685 break; 686 case CANCEL_SCHEDULED: 687 getExecutor(queueId).removeScheduled(workId); 688 break; 689 case IF_NOT_SCHEDULED: 690 case IF_NOT_RUNNING: 691 case IF_NOT_RUNNING_OR_SCHEDULED: 692 // TODO disabled for now because hasWorkInState uses isScheduled 693 // which is buggy 694 boolean disabled = Boolean.TRUE.booleanValue(); 695 if (!disabled && hasWorkInState(workId, scheduling.state)) { 696 if (log.isDebugEnabled()) { 697 log.debug("Canceling schedule because found: " + scheduling); 698 } 699 return; 700 701 } 702 break; 703 704 } 705 queuing.workSchedule(queueId, work); 706 } 707 708 /** 709 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 710 * 711 * @since 5.8 712 */ 713 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 714 TransactionManager transactionManager; 715 try { 716 transactionManager = TransactionHelper.lookupTransactionManager(); 717 } catch (NamingException e) { 718 transactionManager = null; 719 } 720 if (transactionManager == null) { 721 if (log.isDebugEnabled()) { 722 log.debug("Not scheduling work after commit because of missing transaction manager: " + work); 723 } 724 return false; 725 } 726 try { 727 Transaction transaction = transactionManager.getTransaction(); 728 if (transaction == null) { 729 if (log.isDebugEnabled()) { 730 log.debug("Not scheduling work after commit because of missing transaction: " + work); 731 } 732 return false; 733 } 734 int status = transaction.getStatus(); 735 if (status == Status.STATUS_ACTIVE) { 736 if (log.isDebugEnabled()) { 737 log.debug("Scheduling work after commit: " + work); 738 } 739 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 740 return true; 741 } else if (status == Status.STATUS_COMMITTED) { 742 // called in afterCompletion, we can schedule immediately 743 if (log.isDebugEnabled()) { 744 log.debug("Scheduling work immediately: " + work); 745 } 746 return false; 747 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 748 if (log.isDebugEnabled()) { 749 log.debug("Cancelling schedule because transaction marked rollback-only: " + work); 750 } 751 return true; 752 } else { 753 if (log.isDebugEnabled()) { 754 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 755 + work); 756 } 757 return false; 758 } 759 } catch (SystemException | RollbackException e) { 760 log.error("Cannot schedule after commit", e); 761 return false; 762 } 763 } 764 765 @Override 766 @Deprecated 767 public Work find(Work work, State state, boolean useEquals, int[] pos) { 768 if (pos != null) { 769 pos[0] = 0; // compat 770 } 771 String workId = work.getId(); 772 return queuing.find(workId, state); 773 } 774 775 @Override 776 public Work find(String workId, State state) { 777 return queuing.find(workId, state); 778 } 779 780 /** 781 * @param state SCHEDULED, RUNNING or null for both 782 */ 783 protected boolean hasWorkInState(String workId, State state) { 784 return queuing.isWorkInState(workId, state); 785 } 786 787 @Override 788 public State getWorkState(String workId) { 789 return queuing.getWorkState(workId); 790 } 791 792 @Override 793 public List<Work> listWork(String queueId, State state) { 794 // don't return scheduled after commit 795 return queuing.listWork(queueId, state); 796 } 797 798 @Override 799 public List<String> listWorkIds(String queueId, State state) { 800 return queuing.listWorkIds(queueId, state); 801 } 802 803 @Override 804 public WorkQueueMetrics getMetrics(String queueId) { 805 return queuing.metrics(queueId); 806 } 807 808 @Override 809 public int getQueueSize(String queueId, State state) { 810 WorkQueueMetrics metrics = getMetrics(queueId); 811 if (state == null) { 812 return metrics.scheduled.intValue() + metrics.running.intValue(); 813 } 814 if (state == State.SCHEDULED) { 815 return metrics.scheduled.intValue(); 816 } else if (state == State.RUNNING) { 817 return metrics.running.intValue(); 818 } else { 819 throw new IllegalArgumentException(String.valueOf(state)); 820 } 821 } 822 823 @Override 824 @Deprecated 825 public int getNonCompletedWorkSize(String queueId) { 826 return getQueueSize(queueId, null); 827 } 828 829 @Override 830 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 831 return awaitCompletion(null, duration, unit); 832 } 833 834 @Override 835 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 836 if (!isStarted()) { 837 return true; 838 } 839 SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId)); 840 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 841 long deadline = getTimestampAfter(durationInMs); 842 int pause = (int) Math.min(duration, 500L); 843 log.debug("awaitForCompletion " + durationInMs + " ms"); 844 do { 845 if (noScheduledOrRunningWork(queueId)) { 846 completionSynchronizer.signalCompletedWork(); 847 SequenceTracer.stop("done"); 848 return true; 849 } 850 completionSynchronizer.waitForCompletedWork(pause); 851 } while (System.currentTimeMillis() < deadline); 852 log.info("awaitCompletion timeout after " + durationInMs + " ms"); 853 SequenceTracer.destroy("timeout after " + durationInMs + " ms"); 854 return false; 855 } 856 857 protected long getTimestampAfter(long durationInMs) { 858 long ret = System.currentTimeMillis() + durationInMs; 859 if (ret < 0) { 860 ret = Long.MAX_VALUE; 861 } 862 return ret; 863 } 864 865 protected boolean noScheduledOrRunningWork(String queueId) { 866 if (queueId == null) { 867 for (String id : getWorkQueueIds()) { 868 if (!noScheduledOrRunningWork(id)) { 869 return false; 870 } 871 } 872 return true; 873 } 874 boolean ret = getQueueSize(queueId, null) == 0; 875 if (ret == false) { 876 if (log.isTraceEnabled()) { 877 log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + 878 ", running: " + getQueueSize(queueId, State.RUNNING)); 879 } 880 return false; 881 } 882 if (log.isTraceEnabled()) { 883 log.trace(queueId + " is completed"); 884 } 885 return true; 886 } 887 888 @Override 889 public boolean isStarted() { 890 return started && !shutdownInProgress; 891 } 892 893}