001/* 002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.RejectedExecutionException; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036 037import javax.naming.NamingException; 038import javax.transaction.RollbackException; 039import javax.transaction.Status; 040import javax.transaction.Synchronization; 041import javax.transaction.SystemException; 042import javax.transaction.Transaction; 043import javax.transaction.TransactionManager; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.common.logging.SequenceTracer; 048import org.nuxeo.common.utils.ExceptionUtils; 049import org.nuxeo.ecm.core.api.NuxeoException; 050import org.nuxeo.ecm.core.event.EventServiceComponent; 051import org.nuxeo.ecm.core.work.WorkQueuing.Listener; 052import org.nuxeo.ecm.core.work.api.Work; 053import org.nuxeo.ecm.core.work.api.Work.State; 054import org.nuxeo.ecm.core.work.api.WorkManager; 055import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 056import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 057import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor; 058import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 059import org.nuxeo.runtime.api.Framework; 060import org.nuxeo.runtime.metrics.MetricsService; 061import org.nuxeo.runtime.metrics.NuxeoMetricSet; 062import org.nuxeo.runtime.model.ComponentContext; 063import org.nuxeo.runtime.model.ComponentInstance; 064import org.nuxeo.runtime.model.ComponentManager; 065import org.nuxeo.runtime.model.DefaultComponent; 066import org.nuxeo.runtime.transaction.TransactionHelper; 067 068import com.codahale.metrics.Counter; 069import com.codahale.metrics.MetricRegistry; 070import com.codahale.metrics.SharedMetricRegistries; 071import com.codahale.metrics.Timer; 072 073/** 074 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 075 * implementation. 076 * 077 * @since 5.6 078 */ 079public class WorkManagerImpl extends DefaultComponent implements WorkManager { 080 081 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 082 083 private static final Log log = LogFactory.getLog(WorkManagerImpl.class); 084 085 protected static final String QUEUES_EP = "queues"; 086 087 protected static final String IMPL_EP = "implementation"; 088 089 public static final String DEFAULT_QUEUE_ID = "default"; 090 091 public static final String DEFAULT_CATEGORY = "default"; 092 093 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 094 095 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 096 097 // @GuardedBy("itself") 098 protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry(); 099 100 protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry(); 101 102 // used synchronized 103 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 104 105 protected WorkQueuing queuing; 106 107 /** 108 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another 109 * node in cluster mode. 110 */ 111 protected class WorkCompletionSynchronizer { 112 final protected ReentrantLock lock = new ReentrantLock(); 113 114 final protected Condition condition = lock.newCondition(); 115 116 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 117 lock.lock(); 118 try { 119 return condition.await(timeMs, TimeUnit.MILLISECONDS); 120 } finally { 121 lock.unlock(); 122 } 123 } 124 125 protected void signalCompletedWork() { 126 lock.lock(); 127 try { 128 condition.signalAll(); 129 } finally { 130 lock.unlock(); 131 } 132 } 133 134 } 135 136 protected WorkCompletionSynchronizer completionSynchronizer; 137 138 @Override 139 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 140 if (QUEUES_EP.equals(extensionPoint)) { 141 registerWorkQueueDescriptor((WorkQueueDescriptor) contribution); 142 } else if (IMPL_EP.equals(extensionPoint)) { 143 registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 144 } else { 145 throw new RuntimeException("Unknown extension point: " + extensionPoint); 146 } 147 } 148 149 @Override 150 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 151 if (QUEUES_EP.equals(extensionPoint)) { 152 unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution); 153 } else if (IMPL_EP.equals(extensionPoint)) { 154 unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 155 } else { 156 throw new RuntimeException("Unknown extension point: " + extensionPoint); 157 } 158 } 159 160 void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 161 String queueId = workQueueDescriptor.id; 162 if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) { 163 Boolean processing = workQueueDescriptor.processing; 164 if (processing == null) { 165 log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES 166 + " with no processing/queuing"); 167 return; 168 } 169 String what = " processing=" + processing; 170 what += queuing == null ? "" : (" queuing=" + queuing); 171 log.info("Setting on all work queues:" + what); 172 // activate/deactivate processing/queuing on all queues 173 List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy 174 for (String id : queueIds) { 175 // add an updated contribution redefining processing/queuing 176 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 177 wqd.id = id; 178 wqd.processing = processing; 179 registerWorkQueueDescriptor(wqd); 180 } 181 return; 182 } 183 workQueueConfig.addContribution(workQueueDescriptor); 184 WorkQueueDescriptor wqd = workQueueConfig.get(queueId); 185 log.info("Registered work queue " + queueId + " " + wqd.toString()); 186 } 187 188 void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 189 String id = workQueueDescriptor.id; 190 if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) { 191 return; 192 } 193 workQueueConfig.removeContribution(workQueueDescriptor); 194 log.info("Unregistered work queue " + id); 195 } 196 197 void initializeQueue(WorkQueueDescriptor config) { 198 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 199 throw new IllegalArgumentException("cannot initialize all queues"); 200 } 201 if (queuing.getQueue(config.id) != null) { 202 throw new IllegalStateException("work queue " + config.id + " is already initialized"); 203 } 204 if (executors.containsKey(config.id)) { 205 throw new IllegalStateException("work queue " + config.id + " already have an executor"); 206 } 207 NuxeoBlockingQueue queue = queuing.init(config); 208 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-"); 209 int maxPoolSize = config.getMaxThreads(); 210 WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, 211 queue, threadFactory); 212 // prestart all core threads so that direct additions to the queue 213 // (from another Nuxeo instance) can be seen 214 executor.prestartAllCoreThreads(); 215 executors.put(config.id, executor); 216 log.info("Initialized work queue " + config.id + " " + config.toEffectiveString()); 217 } 218 219 void activateQueue(WorkQueueDescriptor config) { 220 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 221 throw new IllegalArgumentException("cannot activate all queues"); 222 } 223 queuing.setActive(config.id, config.isProcessingEnabled()); 224 log.info("Activated work queue " + config.id + " " + config.toEffectiveString()); 225 // Enable metrics 226 if (config.isProcessingEnabled()) { 227 activateQueueMetrics(config.id); 228 } 229 } 230 231 void deactivateQueue(WorkQueueDescriptor config) { 232 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 233 throw new IllegalArgumentException("cannot deactivate all queues"); 234 } 235 // Disable metrics 236 if (config.isProcessingEnabled()) { 237 deactivateQueueMetrics(config.id); 238 } 239 queuing.setActive(config.id, false); 240 log.info("Deactivated work queue " + config.id); 241 } 242 243 void activateQueueMetrics(String queueId) { 244 NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId); 245 queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled"); 246 queueMetrics.putGauge(() -> getMetrics(queueId).running, "running"); 247 queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed"); 248 queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled"); 249 registry.registerAll(queueMetrics); 250 } 251 252 void deactivateQueueMetrics(String queueId) { 253 String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId); 254 registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName)); 255 } 256 257 void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 258 workQueuingConfig.addContribution(descr); 259 } 260 261 void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 262 workQueuingConfig.removeContribution(descr); 263 } 264 265 protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) { 266 try { 267 return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener()); 268 } catch (ReflectiveOperationException | SecurityException e) { 269 throw new RuntimeException(e); 270 } 271 } 272 273 @Override 274 public boolean isQueuingEnabled(String queueId) { 275 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 276 return wqd != null && wqd.isQueuingEnabled(); 277 } 278 279 @Override 280 public void enableProcessing(boolean value) { 281 workQueueConfig.getQueueIds().forEach(id -> enableProcessing(id, value)); 282 } 283 284 @Override 285 public void enableProcessing(String queueId, boolean value) { 286 WorkQueueDescriptor config = workQueueConfig.get(queueId); 287 if (config == null) { 288 throw new IllegalArgumentException("no such queue " + queueId); 289 } 290 if (!value) { 291 deactivateQueue(config); 292 } else { 293 activateQueue(config); 294 } 295 } 296 297 @Override 298 public boolean isProcessingEnabled() { 299 for (String queueId : workQueueConfig.getQueueIds()) { 300 if (queuing.getQueue(queueId).active) { 301 return true; 302 } 303 } 304 return false; 305 } 306 307 @Override 308 public boolean isProcessingEnabled(String queueId) { 309 if (queueId == null) { 310 return isProcessingEnabled(); 311 } 312 return queuing.getQueue(queueId).active; 313 } 314 315 // ----- WorkManager ----- 316 317 @Override 318 public List<String> getWorkQueueIds() { 319 synchronized (workQueueConfig) { 320 return workQueueConfig.getQueueIds(); 321 } 322 } 323 324 @Override 325 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 326 synchronized (workQueueConfig) { 327 return workQueueConfig.get(queueId); 328 } 329 } 330 331 @Override 332 public String getCategoryQueueId(String category) { 333 if (category == null) { 334 category = DEFAULT_CATEGORY; 335 } 336 String queueId = workQueueConfig.getQueueId(category); 337 if (queueId == null) { 338 queueId = DEFAULT_QUEUE_ID; 339 } 340 return queueId; 341 } 342 343 @Override 344 public int getApplicationStartedOrder() { 345 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 346 } 347 348 @Override 349 public void start(ComponentContext context) { 350 init(); 351 } 352 353 @Override 354 public void stop(ComponentContext context) throws InterruptedException { 355 // do nothing: we shutdown the thread pool at first before stopping any component 356 // see ComponentManager.Listener.beforeStop 357 } 358 359 protected volatile boolean started = false; 360 361 protected volatile boolean shutdownInProgress = false; 362 363 @Override 364 public void init() { 365 if (started) { 366 return; 367 } 368 synchronized (this) { 369 if (started) { 370 return; 371 } 372 queuing = newWorkQueuing(workQueuingConfig.klass); 373 completionSynchronizer = new WorkCompletionSynchronizer(); 374 started = true; 375 workQueueConfig.index(); 376 for (String id : workQueueConfig.getQueueIds()) { 377 initializeQueue(workQueueConfig.get(id)); 378 } 379 380 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { 381 @Override 382 public void beforeStop(ComponentManager mgr, boolean isStandby) { 383 for (String id : workQueueConfig.getQueueIds()) { 384 deactivateQueue(workQueueConfig.get(id)); 385 } 386 try { 387 if (!shutdown(10, TimeUnit.SECONDS)) { 388 log.error("Some processors are still active"); 389 } 390 } catch (InterruptedException e) { 391 Thread.currentThread().interrupt(); 392 throw new NuxeoException("Interrupted while stopping work manager thread pools", e); 393 } 394 } 395 396 @Override 397 public void afterStart(ComponentManager mgr, boolean isResume) { 398 for (String id : workQueueConfig.getQueueIds()) { 399 activateQueue(workQueueConfig.get(id)); 400 } 401 } 402 403 @Override 404 public void afterStop(ComponentManager mgr, boolean isStandby) { 405 Framework.getRuntime().getComponentManager().removeListener(this); 406 } 407 }); 408 } 409 } 410 411 protected WorkThreadPoolExecutor getExecutor(String queueId) { 412 if (!started) { 413 if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) { 414 LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode"); 415 init(); 416 } else { 417 throw new IllegalStateException("Work manager not started, could not access to executors"); 418 } 419 } 420 WorkQueueDescriptor workQueueDescriptor; 421 synchronized (workQueueConfig) { 422 workQueueDescriptor = workQueueConfig.get(queueId); 423 } 424 if (workQueueDescriptor == null) { 425 throw new IllegalArgumentException("No such work queue: " + queueId); 426 } 427 428 return executors.get(queueId); 429 } 430 431 @Override 432 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 433 WorkThreadPoolExecutor executor = getExecutor(queueId); 434 return shutdownExecutors(Collections.singleton(executor), timeout, unit); 435 } 436 437 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 438 throws InterruptedException { 439 // mark executors as shutting down 440 for (WorkThreadPoolExecutor executor : list) { 441 executor.shutdownAndSuspend(); 442 } 443 timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); 444 // wait until threads termination 445 for (WorkThreadPoolExecutor executor : list) { 446 long t0 = System.currentTimeMillis(); 447 if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { 448 return false; 449 } 450 timeout -= unit.convert(System.currentTimeMillis() - t0, TimeUnit.MILLISECONDS); 451 } 452 return true; 453 } 454 455 protected long remainingMillis(long t0, long delay) { 456 long d = System.currentTimeMillis() - t0; 457 if (d > delay) { 458 return 0; 459 } 460 return delay - d; 461 } 462 463 protected synchronized void removeExecutor(String queueId) { 464 executors.remove(queueId); 465 } 466 467 @Override 468 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 469 shutdownInProgress = true; 470 try { 471 return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit); 472 } finally { 473 shutdownInProgress = false; 474 started = false; 475 } 476 } 477 478 /** 479 * A work instance and how to schedule it, for schedule-after-commit. 480 * 481 * @since 5.8 482 */ 483 public class WorkScheduling implements Synchronization { 484 public final Work work; 485 486 public final Scheduling scheduling; 487 488 public WorkScheduling(Work work, Scheduling scheduling) { 489 this.work = work; 490 this.scheduling = scheduling; 491 } 492 493 @Override 494 public void beforeCompletion() { 495 } 496 497 @Override 498 public void afterCompletion(int status) { 499 if (status == Status.STATUS_COMMITTED) { 500 schedule(work, scheduling, false); 501 } else if (status == Status.STATUS_ROLLEDBACK) { 502 work.setWorkInstanceState(State.UNKNOWN); 503 } else { 504 throw new IllegalArgumentException("Unsupported transaction status " + status); 505 } 506 } 507 } 508 509 /** 510 * Creates non-daemon threads at normal priority. 511 */ 512 private static class NamedThreadFactory implements ThreadFactory { 513 514 private final AtomicInteger threadNumber = new AtomicInteger(); 515 516 private final ThreadGroup group; 517 518 private final String prefix; 519 520 public NamedThreadFactory(String prefix) { 521 SecurityManager sm = System.getSecurityManager(); 522 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 523 this.prefix = prefix; 524 } 525 526 @Override 527 public Thread newThread(Runnable r) { 528 String name = prefix + threadNumber.incrementAndGet(); 529 Thread thread = new Thread(group, r, name); 530 // do not set daemon 531 thread.setPriority(Thread.NORM_PRIORITY); 532 thread.setUncaughtExceptionHandler(this::handleUncaughtException); 533 return thread; 534 } 535 536 protected void handleUncaughtException(Thread t, Throwable e) { 537 Log logLocal = LogFactory.getLog(WorkManagerImpl.class); 538 if (e instanceof RejectedExecutionException) { 539 // we are responsible of this exception, we use it during shutdown phase to not run the task taken just 540 // before shutdown due to race condition, so log it as WARN 541 logLocal.warn("Rejected execution error on thread " + t.getName(), e); 542 } else if (ExceptionUtils.hasInterruptedCause(e)) { 543 logLocal.warn("Interrupted error on thread" + t.getName(), e); 544 } else { 545 logLocal.error(String.format("Uncaught error on thread: %s, " 546 + "current work might be lost, WorkManager metrics might be corrupted.", t.getName()), e); 547 } 548 } 549 550 } 551 552 /** 553 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 554 * <p> 555 * Completed tasks are passed to another queue. 556 * <p> 557 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 558 * persisted, etc). 559 * 560 * @since 5.6 561 */ 562 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 563 564 protected final String queueId; 565 566 /** 567 * List of running Work instances, in order to be able to interrupt them if requested. 568 */ 569 // @GuardedBy("itself") 570 protected final ConcurrentLinkedQueue<Work> running; 571 572 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 573 // Number of work scheduled by this instance 574 protected final Counter scheduledCount; 575 576 // Number of work currently running on this instance 577 protected final Counter runningCount; 578 579 // Number of work completed by this instance 580 protected final Counter completedCount; 581 582 protected final Timer workTimer; 583 584 protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 585 NuxeoBlockingQueue queue, ThreadFactory threadFactory) { 586 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory); 587 queueId = queue.queueId; 588 running = new ConcurrentLinkedQueue<>(); 589 // init metrics 590 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 591 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 592 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 593 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 594 } 595 596 public int getScheduledOrRunningSize() { 597 int ret = 0; 598 for (String queueId : getWorkQueueIds()) { 599 ret += getQueueSize(queueId, null); 600 } 601 return ret; 602 } 603 604 @Override 605 public void execute(Runnable r) { 606 throw new UnsupportedOperationException("use other api"); 607 } 608 609 /** 610 * Executes the given task sometime in the future. 611 * 612 * @param work the work to execute 613 * @see #execute(Runnable) 614 */ 615 public void execute(Work work) { 616 scheduledCount.inc(); 617 submit(work); 618 } 619 620 /** 621 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 622 * directly 623 */ 624 protected void submit(Work work) throws RuntimeException { 625 queuing.workSchedule(queueId, work); 626 } 627 628 @Override 629 protected void beforeExecute(Thread t, Runnable r) { 630 Work work = WorkHolder.getWork(r); 631 if (isShutdown()) { 632 work.setWorkInstanceState(State.SCHEDULED); 633 queuing.workReschedule(queueId, work); 634 throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work); 635 } 636 work.setWorkInstanceState(State.RUNNING); 637 queuing.workRunning(queueId, work); 638 running.add(work); 639 runningCount.inc(); 640 } 641 642 @Override 643 protected void afterExecute(Runnable r, Throwable t) { 644 Work work = WorkHolder.getWork(r); 645 try { 646 if (work.isSuspending()) { 647 log.trace(work + " is suspending, giving up"); 648 return; 649 } 650 if (isShutdown()) { 651 log.trace("rescheduling " + work.getId(), t); 652 work.setWorkInstanceState(State.SCHEDULED); 653 queuing.workReschedule(queueId, work); 654 return; 655 } 656 work.setWorkInstanceState(State.UNKNOWN); 657 queuing.workCompleted(queueId, work); 658 } finally { 659 running.remove(work); 660 runningCount.dec(); 661 completedCount.inc(); 662 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 663 completionSynchronizer.signalCompletedWork(); 664 } 665 } 666 667 /** 668 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. 669 * 670 * @throws InterruptedException 671 */ 672 public void shutdownAndSuspend() throws InterruptedException { 673 try { 674 // don't consume the queue anymore 675 deactivateQueueMetrics(queueId); 676 queuing.setActive(queueId, false); 677 // suspend all running work 678 for (Work work : running) { 679 work.setWorkInstanceSuspending(); 680 log.trace("suspending and rescheduling " + work.getId()); 681 work.setWorkInstanceState(State.SCHEDULED); 682 queuing.workReschedule(queueId, work); 683 } 684 shutdownNow(); 685 } finally { 686 executors.remove(queueId); 687 } 688 } 689 690 public void removeScheduled(String workId) { 691 queuing.removeScheduled(queueId, workId); 692 } 693 694 } 695 696 @Override 697 public void schedule(Work work) { 698 schedule(work, Scheduling.ENQUEUE, false); 699 } 700 701 @Override 702 public void schedule(Work work, boolean afterCommit) { 703 schedule(work, Scheduling.ENQUEUE, afterCommit); 704 } 705 706 @Override 707 public void schedule(Work work, Scheduling scheduling) { 708 schedule(work, scheduling, false); 709 } 710 711 @Override 712 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 713 String workId = work.getId(); 714 String queueId = getCategoryQueueId(work.getCategory()); 715 if (!isQueuingEnabled(queueId)) { 716 return; 717 } 718 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 719 return; 720 } 721 work.setWorkInstanceState(State.SCHEDULED); 722 WorkSchedulePath.newInstance(work); 723 switch (scheduling) { 724 case ENQUEUE: 725 break; 726 case CANCEL_SCHEDULED: 727 getExecutor(queueId).removeScheduled(workId); 728 break; 729 case IF_NOT_SCHEDULED: 730 case IF_NOT_RUNNING_OR_SCHEDULED: 731 // TODO disabled for now because hasWorkInState uses isScheduled 732 // which is buggy 733 boolean disabled = Boolean.TRUE.booleanValue(); 734 if (!disabled && hasWorkInState(workId, scheduling.state)) { 735 if (log.isDebugEnabled()) { 736 log.debug("Canceling schedule because found: " + scheduling); 737 } 738 return; 739 740 } 741 break; 742 743 } 744 queuing.workSchedule(queueId, work); 745 } 746 747 /** 748 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 749 * 750 * @since 5.8 751 */ 752 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 753 TransactionManager transactionManager; 754 try { 755 transactionManager = TransactionHelper.lookupTransactionManager(); 756 } catch (NamingException e) { 757 transactionManager = null; 758 } 759 if (transactionManager == null) { 760 if (log.isDebugEnabled()) { 761 log.debug("Not scheduling work after commit because of missing transaction manager: " + work); 762 } 763 return false; 764 } 765 try { 766 Transaction transaction = transactionManager.getTransaction(); 767 if (transaction == null) { 768 if (log.isDebugEnabled()) { 769 log.debug("Not scheduling work after commit because of missing transaction: " + work); 770 } 771 return false; 772 } 773 int status = transaction.getStatus(); 774 if (status == Status.STATUS_ACTIVE) { 775 if (log.isDebugEnabled()) { 776 log.debug("Scheduling work after commit: " + work); 777 } 778 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 779 return true; 780 } else if (status == Status.STATUS_COMMITTED) { 781 // called in afterCompletion, we can schedule immediately 782 if (log.isDebugEnabled()) { 783 log.debug("Scheduling work immediately: " + work); 784 } 785 return false; 786 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 787 if (log.isDebugEnabled()) { 788 log.debug("Cancelling schedule because transaction marked rollback-only: " + work); 789 } 790 return true; 791 } else { 792 if (log.isDebugEnabled()) { 793 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 794 + work); 795 } 796 return false; 797 } 798 } catch (SystemException | RollbackException e) { 799 log.error("Cannot schedule after commit", e); 800 return false; 801 } 802 } 803 804 @Override 805 public Work find(String workId, State state) { 806 return queuing.find(workId, state); 807 } 808 809 /** 810 * @param state SCHEDULED, RUNNING or null for both 811 */ 812 protected boolean hasWorkInState(String workId, State state) { 813 return queuing.isWorkInState(workId, state); 814 } 815 816 @Override 817 public State getWorkState(String workId) { 818 return queuing.getWorkState(workId); 819 } 820 821 @Override 822 public List<Work> listWork(String queueId, State state) { 823 // don't return scheduled after commit 824 return queuing.listWork(queueId, state); 825 } 826 827 @Override 828 public List<String> listWorkIds(String queueId, State state) { 829 return queuing.listWorkIds(queueId, state); 830 } 831 832 @Override 833 public WorkQueueMetrics getMetrics(String queueId) { 834 return queuing.metrics(queueId); 835 } 836 837 @Override 838 public int getQueueSize(String queueId, State state) { 839 WorkQueueMetrics metrics = getMetrics(queueId); 840 if (state == null) { 841 return metrics.scheduled.intValue() + metrics.running.intValue(); 842 } 843 if (state == State.SCHEDULED) { 844 return metrics.scheduled.intValue(); 845 } else if (state == State.RUNNING) { 846 return metrics.running.intValue(); 847 } else { 848 throw new IllegalArgumentException(String.valueOf(state)); 849 } 850 } 851 852 @Override 853 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 854 return awaitCompletion(null, duration, unit); 855 } 856 857 @Override 858 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 859 if (!isStarted()) { 860 return true; 861 } 862 SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId)); 863 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 864 long deadline = getTimestampAfter(durationInMs); 865 int pause = (int) Math.min(durationInMs, 500L); 866 log.debug("awaitForCompletion " + durationInMs + " ms"); 867 do { 868 if (noScheduledOrRunningWork(queueId)) { 869 completionSynchronizer.signalCompletedWork(); 870 SequenceTracer.stop("done"); 871 return true; 872 } 873 completionSynchronizer.waitForCompletedWork(pause); 874 } while (System.currentTimeMillis() < deadline); 875 log.info("awaitCompletion timeout after " + durationInMs + " ms"); 876 SequenceTracer.destroy("timeout after " + durationInMs + " ms"); 877 return false; 878 } 879 880 protected long getTimestampAfter(long durationInMs) { 881 long ret = System.currentTimeMillis() + durationInMs; 882 if (ret < 0) { 883 ret = Long.MAX_VALUE; 884 } 885 return ret; 886 } 887 888 protected boolean noScheduledOrRunningWork(String queueId) { 889 if (queueId == null) { 890 for (String id : getWorkQueueIds()) { 891 if (!noScheduledOrRunningWork(id)) { 892 return false; 893 } 894 } 895 return true; 896 } 897 if (!isProcessingEnabled(queueId)) { 898 return getExecutor(queueId).runningCount.getCount() == 0L; 899 } 900 if (getQueueSize(queueId, null) > 0) { 901 if (log.isTraceEnabled()) { 902 log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + ", running: " 903 + getQueueSize(queueId, State.RUNNING)); 904 } 905 return false; 906 } 907 if (log.isTraceEnabled()) { 908 log.trace(queueId + " is completed"); 909 } 910 return true; 911 } 912 913 @Override 914 public boolean isStarted() { 915 return started && !shutdownInProgress; 916 } 917 918}