001/* 002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.lang.reflect.Constructor; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.ConcurrentLinkedQueue; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036 037import javax.naming.NamingException; 038import javax.transaction.RollbackException; 039import javax.transaction.Status; 040import javax.transaction.Synchronization; 041import javax.transaction.SystemException; 042import javax.transaction.Transaction; 043import javax.transaction.TransactionManager; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.common.logging.SequenceTracer; 048import org.nuxeo.ecm.core.event.EventServiceComponent; 049import org.nuxeo.ecm.core.work.api.Work; 050import org.nuxeo.ecm.core.work.api.Work.State; 051import org.nuxeo.ecm.core.work.api.WorkManager; 052import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 053import org.nuxeo.ecm.core.work.api.WorkQueuingImplDescriptor; 054import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 055import org.nuxeo.runtime.RuntimeServiceEvent; 056import org.nuxeo.runtime.RuntimeServiceListener; 057import org.nuxeo.runtime.api.Framework; 058import org.nuxeo.runtime.metrics.MetricsService; 059import org.nuxeo.runtime.model.ComponentContext; 060import org.nuxeo.runtime.model.ComponentInstance; 061import org.nuxeo.runtime.model.DefaultComponent; 062import org.nuxeo.runtime.transaction.TransactionHelper; 063 064import com.codahale.metrics.Counter; 065import com.codahale.metrics.MetricRegistry; 066import com.codahale.metrics.SharedMetricRegistries; 067import com.codahale.metrics.Timer; 068 069/** 070 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 071 * implementation. 072 * 073 * @since 5.6 074 */ 075public class WorkManagerImpl extends DefaultComponent implements WorkManager { 076 077 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 078 079 private static final Log log = LogFactory.getLog(WorkManagerImpl.class); 080 081 protected static final String QUEUES_EP = "queues"; 082 083 protected static final String IMPL_EP = "implementation"; 084 085 public static final String DEFAULT_QUEUE_ID = "default"; 086 087 public static final String DEFAULT_CATEGORY = "default"; 088 089 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 090 091 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 092 093 // @GuardedBy("itself") 094 protected final WorkQueueDescriptorRegistry workQueueDescriptors = new WorkQueueDescriptorRegistry(this); 095 096 // used synchronized 097 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 098 099 protected WorkQueuing queuing = newWorkQueuing(MemoryWorkQueuing.class); 100 101 /** 102 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from 103 * another node in cluster mode. 104 */ 105 protected class WorkCompletionSynchronizer { 106 final protected ReentrantLock lock = new ReentrantLock(); 107 final protected Condition condition = lock.newCondition(); 108 109 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 110 lock.lock(); 111 try { 112 return condition.await(timeMs, TimeUnit.MILLISECONDS); 113 } finally { 114 lock.unlock(); 115 } 116 } 117 118 protected void signalCompletedWork() { 119 lock.lock(); 120 try { 121 condition.signalAll(); 122 } finally { 123 lock.unlock(); 124 } 125 } 126 127 } 128 129 protected WorkCompletionSynchronizer completionSynchronizer; 130 131 @Override 132 public void activate(ComponentContext context) { 133 Framework.addListener(new ShutdownListener()); 134 } 135 136 @Override 137 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 138 if (QUEUES_EP.equals(extensionPoint)) { 139 registerWorkQueueDescriptor((WorkQueueDescriptor) contribution); 140 } else if (IMPL_EP.equals(extensionPoint)) { 141 registerWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution); 142 } else { 143 throw new RuntimeException("Unknown extension point: " + extensionPoint); 144 } 145 } 146 147 @Override 148 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 149 if (QUEUES_EP.equals(extensionPoint)) { 150 unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution); 151 } else if (IMPL_EP.equals(extensionPoint)) { 152 unregisterWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution); 153 } else { 154 throw new RuntimeException("Unknown extension point: " + extensionPoint); 155 } 156 } 157 158 public void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 159 String queueId = workQueueDescriptor.id; 160 if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) { 161 Boolean processing = workQueueDescriptor.processing; 162 Boolean queuing = workQueueDescriptor.queuing; 163 if (processing == null && queuing == null) { 164 log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES 165 + " with no processing/queuing"); 166 return; 167 } 168 String what = processing == null ? "" : (" processing=" + processing); 169 what += queuing == null ? "" : (" queuing=" + queuing); 170 log.info("Setting on all work queues:" + what); 171 // activate/deactivate processing/queuing on all queues 172 List<String> queueIds = new ArrayList<>(workQueueDescriptors.getQueueIds()); // copy 173 for (String id : queueIds) { 174 // add an updated contribution redefining processing/queuing 175 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 176 wqd.id = id; 177 wqd.processing = processing; 178 wqd.queuing = queuing; 179 registerWorkQueueDescriptor(wqd); 180 } 181 return; 182 } 183 workQueueDescriptors.addContribution(workQueueDescriptor); 184 WorkQueueDescriptor wqd = workQueueDescriptors.get(queueId); 185 log.info("Registered work queue " + queueId + " " + wqd.toString()); 186 } 187 188 public void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 189 String id = workQueueDescriptor.id; 190 if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) { 191 return; 192 } 193 workQueueDescriptors.removeContribution(workQueueDescriptor); 194 log.info("Unregistered work queue " + id); 195 } 196 197 protected void activateQueue(WorkQueueDescriptor workQueueDescriptor) { 198 String id = workQueueDescriptor.id; 199 WorkThreadPoolExecutor executor = executors.get(id); 200 if (executor == null) { 201 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + id + "-"); 202 int maxPoolSize = workQueueDescriptor.getMaxThreads(); 203 executor = new WorkThreadPoolExecutor(id, maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, threadFactory); 204 // prestart all core threads so that direct additions to the queue 205 // (from another Nuxeo instance) can be seen 206 executor.prestartAllCoreThreads(); 207 executors.put(id, executor); 208 } 209 NuxeoBlockingQueue queue = (NuxeoBlockingQueue) executor.getQueue(); 210 // get merged contrib 211 // set active state 212 queue.setActive(workQueueDescriptor.isProcessingEnabled()); 213 log.info("Activated work queue " + id + " " + workQueueDescriptor.toEffectiveString()); 214 } 215 216 public void deactivateQueue(WorkQueueDescriptor workQueueDescriptor) { 217 if (WorkQueueDescriptor.ALL_QUEUES.equals(workQueueDescriptor.id)) { 218 return; 219 } 220 WorkThreadPoolExecutor executor = executors.get(workQueueDescriptor.id); 221 executor.shutdownAndSuspend(); 222 log.info("Deactivated work queue " + workQueueDescriptor.id); 223 } 224 225 public void registerWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) { 226 WorkQueuing q = newWorkQueuing(descr.getWorkQueuingClass()); 227 registerWorkQueuing(q); 228 } 229 230 public void registerWorkQueuing(WorkQueuing q) { 231 closeQueuing(); 232 queuing = q; 233 } 234 235 public void unregisterWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) { 236 unregisterWorkQueing(); 237 } 238 239 public void unregisterWorkQueing() { 240 closeQueuing(); 241 queuing = newWorkQueuing(MemoryWorkQueuing.class); 242 } 243 244 protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) { 245 WorkQueuing q; 246 try { 247 Constructor<? extends WorkQueuing> ctor = klass.getConstructor(WorkManagerImpl.class, 248 WorkQueueDescriptorRegistry.class); 249 q = ctor.newInstance(this, workQueueDescriptors); 250 } catch (ReflectiveOperationException | SecurityException e) { 251 throw new RuntimeException(e); 252 } 253 return q; 254 } 255 256 protected void closeQueuing() { 257 try { 258 shutdown(5, TimeUnit.SECONDS); 259 } catch (InterruptedException e) { 260 Thread.currentThread().interrupt(); // restore interrupted status 261 throw new RuntimeException(e); 262 } 263 } 264 265 protected boolean isQueuingEnabled(String queueId) { 266 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 267 return wqd == null ? false : wqd.isQueuingEnabled(); 268 } 269 270 protected boolean isProcessingEnabled(String queueId) { 271 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 272 return wqd == null ? false : wqd.isProcessingEnabled(); 273 } 274 275 // ----- WorkManager ----- 276 277 @Override 278 public List<String> getWorkQueueIds() { 279 synchronized (workQueueDescriptors) { 280 return workQueueDescriptors.getQueueIds(); 281 } 282 } 283 284 @Override 285 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 286 synchronized (workQueueDescriptors) { 287 return workQueueDescriptors.get(queueId); 288 } 289 } 290 291 @Override 292 public String getCategoryQueueId(String category) { 293 if (category == null) { 294 category = DEFAULT_CATEGORY; 295 } 296 String queueId = workQueueDescriptors.getQueueId(category); 297 if (queueId == null) { 298 queueId = DEFAULT_QUEUE_ID; 299 } 300 return queueId; 301 } 302 303 @Override 304 public int getApplicationStartedOrder() { 305 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 306 } 307 308 @Override 309 public void applicationStarted(ComponentContext context) { 310 init(); 311 } 312 313 protected volatile boolean started = false; 314 315 protected volatile boolean shutdownInProgress = false; 316 317 @Override 318 public void init() { 319 if (started) { 320 return; 321 } 322 synchronized (this) { 323 if (started) { 324 return; 325 } 326 completionSynchronizer = new WorkCompletionSynchronizer(); 327 started = true; 328 queuing.init(); 329 for (String id : workQueueDescriptors.getQueueIds()) { 330 activateQueue(workQueueDescriptors.get(id)); 331 } 332 } 333 } 334 335 protected WorkThreadPoolExecutor getExecutor(String queueId) { 336 if (!started) { 337 if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) { 338 LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode"); 339 init(); 340 } else { 341 throw new IllegalStateException("Work manager not started, could not access to executors"); 342 } 343 } 344 WorkQueueDescriptor workQueueDescriptor; 345 synchronized (workQueueDescriptors) { 346 workQueueDescriptor = workQueueDescriptors.get(queueId); 347 } 348 if (workQueueDescriptor == null) { 349 throw new IllegalArgumentException("No such work queue: " + queueId); 350 } 351 352 return executors.get(queueId); 353 } 354 355 @Override 356 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 357 WorkThreadPoolExecutor executor = getExecutor(queueId); 358 boolean terminated = shutdownExecutors(Collections.singleton(executor), timeout, unit); 359 removeExecutor(queueId); // start afresh 360 return terminated; 361 } 362 363 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 364 throws InterruptedException { 365 // mark executors as shutting down 366 for (WorkThreadPoolExecutor executor : list) { 367 executor.shutdownAndSuspend(); 368 } 369 // don't wait anymore, running will always abort 370 return true; 371 } 372 373 protected long remainingMillis(long t0, long delay) { 374 long d = System.currentTimeMillis() - t0; 375 if (d > delay) { 376 return 0; 377 } 378 return delay - d; 379 } 380 381 protected synchronized void removeExecutor(String queueId) { 382 executors.remove(queueId); 383 } 384 385 @Override 386 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 387 shutdownInProgress = true; 388 try { 389 return shutdownExecutors(executors.values(), timeout, unit); 390 } finally { 391 shutdownInProgress = false; 392 started = false; 393 completionSynchronizer = null; 394 executors.clear(); 395 } 396 } 397 398 protected class ShutdownListener implements RuntimeServiceListener { 399 @Override 400 public void handleEvent(RuntimeServiceEvent event) { 401 if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) { 402 return; 403 } 404 Framework.removeListener(this); 405 closeQueuing(); 406 } 407 } 408 409 /** 410 * A work instance and how to schedule it, for schedule-after-commit. 411 * 412 * @since 5.8 413 */ 414 public class WorkScheduling implements Synchronization { 415 public final Work work; 416 417 public final Scheduling scheduling; 418 419 public WorkScheduling(Work work, Scheduling scheduling) { 420 this.work = work; 421 this.scheduling = scheduling; 422 } 423 424 @Override 425 public void beforeCompletion() { 426 } 427 428 @Override 429 public void afterCompletion(int status) { 430 if (status == Status.STATUS_COMMITTED) { 431 schedule(work, scheduling, false); 432 } else if (status == Status.STATUS_ROLLEDBACK) { 433 work.setWorkInstanceState(State.CANCELED); 434 } else { 435 throw new IllegalArgumentException("Unsupported transaction status " + status); 436 } 437 } 438 } 439 440 /** 441 * Creates non-daemon threads at normal priority. 442 */ 443 private static class NamedThreadFactory implements ThreadFactory { 444 445 private final AtomicInteger threadNumber = new AtomicInteger(); 446 447 private final ThreadGroup group; 448 449 private final String prefix; 450 451 public NamedThreadFactory(String prefix) { 452 SecurityManager sm = System.getSecurityManager(); 453 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 454 this.prefix = prefix; 455 } 456 457 @Override 458 public Thread newThread(Runnable r) { 459 String name = prefix + threadNumber.incrementAndGet(); 460 Thread thread = new Thread(group, r, name); 461 // do not set daemon 462 thread.setPriority(Thread.NORM_PRIORITY); 463 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 464 465 @Override 466 public void uncaughtException(Thread t, Throwable e) { 467 LogFactory.getLog(WorkManagerImpl.class).error("Uncaught error on thread " + t.getName(), e); 468 } 469 }); 470 return thread; 471 } 472 } 473 474 /** 475 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 476 * <p> 477 * Completed tasks are passed to another queue. 478 * <p> 479 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 480 * persisted, etc). 481 * 482 * @since 5.6 483 */ 484 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 485 486 protected final String queueId; 487 488 /** 489 * List of running Work instances, in order to be able to interrupt them if requested. 490 */ 491 // @GuardedBy("itself") 492 protected final ConcurrentLinkedQueue<Work> running; 493 494 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 495 // Number of work scheduled by this instance 496 protected final Counter scheduledCount; 497 498 // Number of work currently running on this instance 499 protected final Counter runningCount; 500 501 // Number of work completed by this instance 502 protected final Counter completedCount; 503 504 protected final Timer workTimer; 505 506 protected WorkThreadPoolExecutor(String queueId, int corePoolSize, int maximumPoolSize, long keepAliveTime, 507 TimeUnit unit, ThreadFactory threadFactory) { 508 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queuing.initWorkQueue(queueId), threadFactory); 509 this.queueId = queueId; 510 running = new ConcurrentLinkedQueue<Work>(); 511 // init metrics 512 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 513 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 514 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 515 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 516 } 517 518 public int getScheduledOrRunningSize() { 519 int ret = 0; 520 for (String queueId : getWorkQueueIds()) { 521 ret += getQueueSize(queueId, null); 522 } 523 return ret; 524 } 525 526 @Override 527 public void execute(Runnable r) { 528 throw new UnsupportedOperationException("use other api"); 529 } 530 531 /** 532 * Executes the given task sometime in the future. 533 * 534 * @param work the work to execute 535 * @see #execute(Runnable) 536 */ 537 public void execute(Work work) { 538 scheduledCount.inc(); 539 submit(work); 540 } 541 542 /** 543 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 544 * directly 545 * 546 * @param work 547 * @throws RuntimeException 548 */ 549 protected void submit(Work work) throws RuntimeException { 550 boolean added = queuing.workSchedule(queueId, work); 551 if (!added) { 552 queuing.removeScheduled(queueId, work.getId()); 553 throw new RuntimeException("queue should have blocked"); 554 } 555 // DO NOT super.execute(new WorkHolder(work)); 556 } 557 558 @Override 559 protected void beforeExecute(Thread t, Runnable r) { 560 Work work = WorkHolder.getWork(r); 561 if (started == false || shutdownInProgress == true) { 562 work.setWorkInstanceState(State.SCHEDULED); 563 queuing.workSchedule(queueId, work); 564 return; 565 } 566 work.setWorkInstanceState(State.RUNNING); 567 queuing.workRunning(queueId, work); 568 running.add(work); 569 runningCount.inc(); 570 } 571 572 @Override 573 protected void afterExecute(Runnable r, Throwable t) { 574 Work work = WorkHolder.getWork(r); 575 try { 576 if (work.isSuspending()) { 577 return; 578 } 579 work.setWorkInstanceState(t == null ? State.COMPLETED : State.FAILED); 580 queuing.workCompleted(queueId, work); 581 } finally { 582 // metrics 583 running.remove(work); 584 runningCount.dec(); 585 completedCount.inc(); 586 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 587 completionSynchronizer.signalCompletedWork(); 588 } 589 } 590 591 // called during shutdown 592 // with tasks from the queue if new tasks are submitted 593 // or with tasks drained from the queue 594 protected void removedFromQueue(Runnable r) { 595 Work work = WorkHolder.getWork(r); 596 work.setWorkInstanceState(State.CANCELED); 597 completionSynchronizer.signalCompletedWork(); 598 } 599 600 /** 601 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. 602 */ 603 public void shutdownAndSuspend() { 604 // don't consume the queue anymore 605 ((NuxeoBlockingQueue) getQueue()).setActive(false); 606 // shutdown the executor 607 shutdown(); 608 // suspend and reschedule all running work 609 synchronized (running) { 610 for (Work work : running) { 611 log.debug("re-scheduling " + work); 612 work.setWorkInstanceState(State.SCHEDULED); 613 queuing.workSchedule(queueId, work); 614 work.setWorkInstanceSuspending(); 615 } 616 } 617 shutdownNow(); 618 } 619 620 /** 621 * Blocks until all work instances have completed after a shutdown and suspend request. 622 * 623 * @param timeout the time to wait 624 * @param unit the timeout unit 625 * @return true if all work stopped or was saved, false if some remaining after timeout 626 */ 627 public boolean awaitTerminationOrSave(long timeout, TimeUnit unit) throws InterruptedException { 628 boolean terminated = super.awaitTermination(timeout, unit); 629 if (!terminated) { 630 // drain queue from remaining scheduled work 631 List<Runnable> drained = new ArrayList<>(); 632 getQueue().drainTo(drained); 633 for (Runnable r : drained) { 634 removedFromQueue(r); 635 } 636 } 637 // some work still remaining after timeout 638 return terminated; 639 } 640 641 public Work removeScheduled(String workId) { 642 Work w = queuing.removeScheduled(queueId, workId); 643 return w; 644 } 645 646 } 647 648 @Override 649 public void schedule(Work work) { 650 schedule(work, Scheduling.ENQUEUE, false); 651 } 652 653 @Override 654 public void schedule(Work work, boolean afterCommit) { 655 schedule(work, Scheduling.ENQUEUE, afterCommit); 656 } 657 658 @Override 659 public void schedule(Work work, Scheduling scheduling) { 660 schedule(work, scheduling, false); 661 } 662 663 @Override 664 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 665 String workId = work.getId(); 666 String queueId = getCategoryQueueId(work.getCategory()); 667 if (!isQueuingEnabled(queueId)) { 668 return; 669 } 670 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 671 return; 672 } 673 work.setWorkInstanceState(State.SCHEDULED); 674 WorkSchedulePath.newInstance(work); 675 if (log.isTraceEnabled()) { 676 log.trace("Scheduling work: " + work + " using queue: " + queueId, work.getSchedulePath().getStack()); 677 } else if (log.isDebugEnabled()) { 678 log.debug("Scheduling work: " + work + " using queue: " + queueId); 679 } 680 switch (scheduling) { 681 case ENQUEUE: 682 break; 683 case CANCEL_SCHEDULED: 684 Work w = getExecutor(queueId).removeScheduled(workId); 685 if (w != null) { 686 w.setWorkInstanceState(State.CANCELED); 687 if (log.isDebugEnabled()) { 688 log.debug("Canceling existing scheduled work before scheduling"); 689 } 690 } 691 break; 692 case IF_NOT_SCHEDULED: 693 case IF_NOT_RUNNING: 694 case IF_NOT_RUNNING_OR_SCHEDULED: 695 // TODO disabled for now because hasWorkInState uses isScheduled 696 // which is buggy 697 boolean disabled = Boolean.TRUE.booleanValue(); 698 if (!disabled && hasWorkInState(workId, scheduling.state)) { 699 // mark passed work as canceled 700 work.setWorkInstanceState(State.CANCELED); 701 if (log.isDebugEnabled()) { 702 log.debug("Canceling schedule because found: " + scheduling); 703 } 704 return; 705 706 } 707 break; 708 709 } 710 getExecutor(queueId).execute(work); 711 } 712 713 /** 714 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 715 * 716 * @since 5.8 717 */ 718 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 719 TransactionManager transactionManager; 720 try { 721 transactionManager = TransactionHelper.lookupTransactionManager(); 722 } catch (NamingException e) { 723 transactionManager = null; 724 } 725 if (transactionManager == null) { 726 if (log.isDebugEnabled()) { 727 log.debug("Not scheduling work after commit because of missing transaction manager: " + work); 728 } 729 return false; 730 } 731 try { 732 Transaction transaction = transactionManager.getTransaction(); 733 if (transaction == null) { 734 if (log.isDebugEnabled()) { 735 log.debug("Not scheduling work after commit because of missing transaction: " + work); 736 } 737 return false; 738 } 739 int status = transaction.getStatus(); 740 if (status == Status.STATUS_ACTIVE) { 741 if (log.isDebugEnabled()) { 742 log.debug("Scheduling work after commit: " + work); 743 } 744 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 745 return true; 746 } else if (status == Status.STATUS_COMMITTED) { 747 // called in afterCompletion, we can schedule immediately 748 if (log.isDebugEnabled()) { 749 log.debug("Scheduling work immediately: " + work); 750 } 751 return false; 752 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 753 if (log.isDebugEnabled()) { 754 log.debug("Cancelling schedule because transaction marked rollback-only: " + work); 755 } 756 work.setWorkInstanceState(State.CANCELED); 757 return true; 758 } else { 759 if (log.isDebugEnabled()) { 760 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 761 + work); 762 } 763 return false; 764 } 765 } catch (SystemException | RollbackException e) { 766 log.error("Cannot schedule after commit", e); 767 return false; 768 } 769 } 770 771 @Override 772 @Deprecated 773 public Work find(Work work, State state, boolean useEquals, int[] pos) { 774 if (pos != null) { 775 pos[0] = 0; // compat 776 } 777 String workId = work.getId(); 778 return queuing.find(workId, state); 779 } 780 781 @Override 782 public Work find(String workId, State state) { 783 return queuing.find(workId, state); 784 } 785 786 @Override 787 public String findResult(String workId) { 788 Work work = find(workId, State.COMPLETED); 789 return work != null ? work.getWorkInstanceResult() : null; 790 } 791 792 /** 793 * @param state SCHEDULED, RUNNING or null for both 794 */ 795 protected boolean hasWorkInState(String workId, State state) { 796 return queuing.isWorkInState(workId, state); 797 } 798 799 @Override 800 public State getWorkState(String workId) { 801 return queuing.getWorkState(workId); 802 } 803 804 @Override 805 public List<Work> listWork(String queueId, State state) { 806 // don't return scheduled after commit 807 return queuing.listWork(queueId, state); 808 } 809 810 @Override 811 public List<String> listWorkIds(String queueId, State state) { 812 return queuing.listWorkIds(queueId, state); 813 } 814 815 @Override 816 public int getQueueSize(String queueId, State state) { 817 if (state == null) { 818 return getScheduledOrRunningSize(queueId); 819 } 820 if (state == State.SCHEDULED) { 821 return getScheduledSize(queueId); 822 } else if (state == State.RUNNING) { 823 return getRunningSize(queueId); 824 } else if (state == State.COMPLETED) { 825 return getCompletedSize(queueId); 826 } else { 827 throw new IllegalArgumentException(String.valueOf(state)); 828 } 829 } 830 831 @Override 832 @Deprecated 833 public int getNonCompletedWorkSize(String queueId) { 834 return getScheduledOrRunningSize(queueId); 835 } 836 837 protected int getScheduledSize(String queueId) { 838 return queuing.count(queueId, State.SCHEDULED); 839 } 840 841 protected int getRunningSize(String queueId) { 842 return queuing.count(queueId, State.RUNNING); 843 } 844 845 protected int getScheduledOrRunningSize(String queueId) { 846 return getScheduledSize(queueId) + getRunningSize(queueId); 847 } 848 849 protected int getCompletedSize(String queueId) { 850 return queuing.count(queueId, State.COMPLETED); 851 } 852 853 @Override 854 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 855 return awaitCompletion(null, duration, unit); 856 } 857 858 @Override 859 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 860 if (!isStarted()) { 861 return true; 862 } 863 SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId)); 864 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 865 long deadline = getTimestampAfter(durationInMs); 866 int pause = (int) Math.min(duration, 500L); 867 log.debug("awaitForCompletion " + durationInMs + " ms"); 868 do { 869 if (noScheduledOrRunningWork(queueId)) { 870 log.debug("Completed"); 871 completionSynchronizer.signalCompletedWork(); 872 SequenceTracer.stop("done"); 873 return true; 874 } 875 completionSynchronizer.waitForCompletedWork(pause); 876 } while (System.currentTimeMillis() < deadline); 877 log.info("awaitCompletion timeout after " + durationInMs + " ms"); 878 SequenceTracer.destroy("timeout after " + durationInMs + " ms"); 879 return false; 880 } 881 882 protected long getTimestampAfter(long durationInMs) { 883 long ret = System.currentTimeMillis() + durationInMs; 884 if (ret < 0) { 885 ret = Long.MAX_VALUE; 886 } 887 return ret; 888 } 889 890 protected boolean noScheduledOrRunningWork(String queueId) { 891 if (queueId != null) { 892 boolean ret = getQueueSize(queueId, null) == 0; 893 if (ret == false) { 894 if (log.isTraceEnabled()) { 895 log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + 896 ", running: " + getQueueSize(queueId, State.RUNNING)); 897 } 898 } 899 return ret; 900 } 901 for (String id : getWorkQueueIds()) { 902 if (getQueueSize(id, null) > 0) { 903 if (log.isTraceEnabled()) { 904 log.trace(id + " not empty, sched: " + getQueueSize(id, State.SCHEDULED) + 905 ", running: " + getQueueSize(id, State.RUNNING)); 906 } 907 return false; 908 } 909 } 910 return true; 911 } 912 913 @Override 914 public synchronized void clearCompletedWork(String queueId) { 915 queuing.clearCompletedWork(queueId, 0); 916 } 917 918 @Override 919 public synchronized void clearCompletedWork(long completionTime) { 920 for (String queueId : queuing.getCompletedQueueIds()) { 921 queuing.clearCompletedWork(queueId, completionTime); 922 } 923 } 924 925 @Override 926 public synchronized void cleanup() { 927 log.debug("Clearing old completed work"); 928 for (String queueId : queuing.getCompletedQueueIds()) { 929 WorkQueueDescriptor workQueueDescriptor = workQueueDescriptors.get(queueId); 930 if (workQueueDescriptor == null) { 931 // unknown queue 932 continue; 933 } 934 long delay = workQueueDescriptor.getClearCompletedAfterSeconds() * 1000L; 935 if (delay > 0) { 936 long completionTime = System.currentTimeMillis() - delay; 937 queuing.clearCompletedWork(queueId, completionTime); 938 } 939 } 940 } 941 942 @Override 943 public boolean isStarted() { 944 return started && !shutdownInProgress; 945 } 946 947}