001/* 002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import static org.nuxeo.ecm.core.work.api.WorkQueueDescriptor.ALL_QUEUES; 023 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.RejectedExecutionException; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.locks.Condition; 037import java.util.concurrent.locks.ReentrantLock; 038import java.util.stream.Collectors; 039 040import javax.naming.NamingException; 041import javax.transaction.RollbackException; 042import javax.transaction.Status; 043import javax.transaction.Synchronization; 044import javax.transaction.SystemException; 045import javax.transaction.Transaction; 046import javax.transaction.TransactionManager; 047 048import org.apache.commons.logging.Log; 049import org.apache.commons.logging.LogFactory; 050import org.apache.logging.log4j.LogManager; 051import org.apache.logging.log4j.Logger; 052import org.nuxeo.common.logging.SequenceTracer; 053import org.nuxeo.common.utils.ExceptionUtils; 054import org.nuxeo.ecm.core.api.NuxeoException; 055import org.nuxeo.ecm.core.event.EventServiceComponent; 056import org.nuxeo.ecm.core.work.WorkQueuing.Listener; 057import org.nuxeo.ecm.core.work.api.Work; 058import org.nuxeo.ecm.core.work.api.Work.State; 059import org.nuxeo.ecm.core.work.api.WorkManager; 060import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 061import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 062import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor; 063import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 064import org.nuxeo.runtime.api.Framework; 065import org.nuxeo.runtime.metrics.MetricsService; 066import org.nuxeo.runtime.metrics.NuxeoMetricSet; 067import org.nuxeo.runtime.model.ComponentContext; 068import org.nuxeo.runtime.model.ComponentInstance; 069import org.nuxeo.runtime.model.ComponentManager; 070import org.nuxeo.runtime.model.DefaultComponent; 071import org.nuxeo.runtime.model.Descriptor; 072import org.nuxeo.runtime.services.config.ConfigurationService; 073import org.nuxeo.runtime.transaction.TransactionHelper; 074 075import com.codahale.metrics.Counter; 076import com.codahale.metrics.MetricRegistry; 077import com.codahale.metrics.SharedMetricRegistries; 078import com.codahale.metrics.Timer; 079 080/** 081 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 082 * implementation. 083 * 084 * @since 5.6 085 */ 086public class WorkManagerImpl extends DefaultComponent implements WorkManager { 087 088 private static final Logger log = LogManager.getLogger(WorkManagerImpl.class); 089 090 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 091 092 protected static final String QUEUES_EP = "queues"; 093 094 protected static final String IMPL_EP = "implementation"; 095 096 public static final String DEFAULT_QUEUE_ID = "default"; 097 098 public static final String DEFAULT_CATEGORY = "default"; 099 100 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 101 102 /** 103 * @since 10.2 104 */ 105 public static final String SHUTDOWN_DELAY_MS_KEY = "nuxeo.work.shutdown.delay.ms"; 106 107 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 108 109 // used synchronized 110 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 111 112 protected final Map<String, String> categoryToQueueId = new HashMap<>(); 113 114 protected WorkQueuing queuing; 115 116 protected boolean active = true; 117 118 public WorkManagerImpl() { 119 // make subclasses (StreamWorkManager) use the same registry 120 super.setName(NAME); 121 } 122 123 @Override 124 public void setName(String name) { 125 // do nothing: name is hardcoded and does not depend on XML configuration 126 } 127 128 /** 129 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another 130 * node in cluster mode. 131 */ 132 protected class WorkCompletionSynchronizer { 133 protected final ReentrantLock lock = new ReentrantLock(); 134 135 protected final Condition condition = lock.newCondition(); 136 137 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 138 lock.lock(); 139 try { 140 return condition.await(timeMs, TimeUnit.MILLISECONDS); // caller is looping 141 } finally { 142 lock.unlock(); 143 } 144 } 145 146 protected void signalCompletedWork() { 147 lock.lock(); 148 try { 149 condition.signalAll(); 150 } finally { 151 lock.unlock(); 152 } 153 } 154 155 } 156 157 protected WorkCompletionSynchronizer completionSynchronizer; 158 159 @Override 160 public void registerContribution(Object contribution, String xp, ComponentInstance component) { 161 if (QUEUES_EP.equals(xp)) { 162 WorkQueueDescriptor descriptor = (WorkQueueDescriptor) contribution; 163 if (ALL_QUEUES.equals(descriptor.getId())) { 164 Boolean processing = descriptor.processing; 165 if (processing == null) { 166 log.error("Ignoring work queue descriptor {} with no processing/queuing", ALL_QUEUES); 167 return; 168 } 169 log.info("Setting on all work queues:{}", 170 () -> " processing=" + processing + (queuing == null ? "" : " queuing=" + queuing)); 171 // activate/deactivate processing on all queues 172 getDescriptors(QUEUES_EP).forEach(d -> { 173 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 174 wqd.id = d.getId(); 175 wqd.processing = processing; 176 register(QUEUES_EP, wqd); 177 }); 178 } else { 179 register(QUEUES_EP, descriptor); 180 } 181 } else { 182 super.registerContribution(contribution, xp, component); 183 } 184 } 185 186 void initializeQueue(WorkQueueDescriptor config) { 187 if (ALL_QUEUES.equals(config.id)) { 188 throw new IllegalArgumentException("cannot initialize all queues"); 189 } 190 if (queuing.getQueue(config.id) != null) { 191 throw new IllegalStateException("work queue " + config.id + " is already initialized"); 192 } 193 if (executors.containsKey(config.id)) { 194 throw new IllegalStateException("work queue " + config.id + " already have an executor"); 195 } 196 NuxeoBlockingQueue queue = queuing.init(config); 197 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-"); 198 int maxPoolSize = config.getMaxThreads(); 199 WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, 200 queue, threadFactory); 201 // prestart all core threads so that direct additions to the queue 202 // (from another Nuxeo instance) can be seen 203 executor.prestartAllCoreThreads(); 204 executors.put(config.id, executor); 205 log.info("Initialized work queue {}, {}", config.id, config); 206 } 207 208 void activateQueue(WorkQueueDescriptor config) { 209 if (ALL_QUEUES.equals(config.id)) { 210 throw new IllegalArgumentException("cannot activate all queues"); 211 } 212 queuing.setActive(config.id, config.isProcessingEnabled()); 213 log.info("Activated work queue {}, {}", config.id, config); 214 // Enable metrics 215 if (config.isProcessingEnabled()) { 216 activateQueueMetrics(config.id); 217 } 218 } 219 220 void deactivateQueue(WorkQueueDescriptor config) { 221 if (ALL_QUEUES.equals(config.id)) { 222 throw new IllegalArgumentException("cannot deactivate all queues"); 223 } 224 // Disable metrics 225 if (config.isProcessingEnabled()) { 226 deactivateQueueMetrics(config.id); 227 } 228 queuing.setActive(config.id, false); 229 log.info("Deactivated work queue {}", config.id); 230 } 231 232 void activateQueueMetrics(String queueId) { 233 NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId); 234 queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled"); 235 queueMetrics.putGauge(() -> getMetrics(queueId).running, "running"); 236 queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed"); 237 queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled"); 238 registry.registerAll(queueMetrics); 239 } 240 241 void deactivateQueueMetrics(String queueId) { 242 String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId); 243 registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName)); 244 } 245 246 @Override 247 public boolean isQueuingEnabled(String queueId) { 248 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 249 return wqd != null && wqd.isQueuingEnabled(); 250 } 251 252 @Override 253 public void enableProcessing(boolean value) { 254 getDescriptors(QUEUES_EP).forEach(d -> enableProcessing(d.getId(), value)); 255 } 256 257 @Override 258 public void enableProcessing(String queueId, boolean value) { 259 WorkQueueDescriptor config = getDescriptor(QUEUES_EP, queueId); 260 if (config == null) { 261 throw new IllegalArgumentException("no such queue " + queueId); 262 } 263 if (!value) { 264 if (!queuing.supportsProcessingDisabling()) { 265 log.error("Attempting to disable works processing on a WorkQueuing instance that does not support it. " 266 + "Works will still be processed. " 267 + "Disabling works processing to manage distribution finely can be done using Redis or Stream implementations."); 268 } 269 deactivateQueue(config); 270 } else { 271 activateQueue(config); 272 } 273 } 274 275 @Override 276 public boolean isProcessingEnabled() { 277 for (Descriptor d : getDescriptors(QUEUES_EP)) { 278 if (queuing.getQueue(d.getId()).active) { 279 return true; 280 } 281 } 282 return false; 283 } 284 285 @Override 286 public boolean isProcessingEnabled(String queueId) { 287 if (queueId == null) { 288 return isProcessingEnabled(); 289 } 290 return queuing.getQueue(queueId).active; 291 } 292 293 // ----- WorkManager ----- 294 295 @Override 296 public List<String> getWorkQueueIds() { 297 synchronized (getRegistry()) { 298 return getDescriptors(QUEUES_EP).stream().map(Descriptor::getId).collect(Collectors.toList()); 299 } 300 } 301 302 @Override 303 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 304 synchronized (getRegistry()) { 305 return getDescriptor(QUEUES_EP, queueId); 306 } 307 } 308 309 @Override 310 public String getCategoryQueueId(String category) { 311 if (category == null) { 312 category = DEFAULT_CATEGORY; 313 } 314 String queueId = categoryToQueueId.get(category); 315 if (queueId == null) { 316 queueId = DEFAULT_QUEUE_ID; 317 } 318 return queueId; 319 } 320 321 @Override 322 public int getApplicationStartedOrder() { 323 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 324 } 325 326 @Override 327 public void start(ComponentContext context) { 328 super.start(context); 329 init(); 330 } 331 332 protected volatile boolean started = false; 333 334 protected volatile boolean shutdownInProgress = false; 335 336 @Override 337 public void init() { 338 if (started || !active) { 339 return; 340 } 341 synchronized (this) { 342 if (started) { 343 return; 344 } 345 WorkQueuingDescriptor d = getDescriptor(IMPL_EP, Descriptor.UNIQUE_DESCRIPTOR_ID); 346 try { 347 queuing = d.klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener()); 348 } catch (ReflectiveOperationException | SecurityException e) { 349 throw new RuntimeException(e); 350 } 351 completionSynchronizer = new WorkCompletionSynchronizer(); 352 started = true; 353 index(); 354 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 355 for (WorkQueueDescriptor descriptor : descriptors) { 356 initializeQueue(descriptor); 357 } 358 359 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { 360 @Override 361 public void beforeStop(ComponentManager mgr, boolean isStandby) { 362 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 363 for (WorkQueueDescriptor descriptor : descriptors) { 364 deactivateQueue(descriptor); 365 } 366 try { 367 if (!shutdown(10, TimeUnit.SECONDS)) { 368 log.error("Some processors are still active"); 369 } 370 } catch (InterruptedException e) { 371 Thread.currentThread().interrupt(); 372 throw new NuxeoException("Interrupted while stopping work manager thread pools", e); 373 } 374 } 375 376 @Override 377 public void afterStart(ComponentManager mgr, boolean isResume) { 378 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 379 for (WorkQueueDescriptor descriptor : descriptors) { 380 activateQueue(descriptor); 381 } 382 } 383 384 @Override 385 public void afterStop(ComponentManager mgr, boolean isStandby) { 386 Framework.getRuntime().getComponentManager().removeListener(this); 387 } 388 }); 389 } 390 } 391 392 protected void index() { 393 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 394 descriptors.forEach(d -> d.categories.forEach(c -> { 395 categoryToQueueId.computeIfPresent(c, (k, v) -> { 396 if (!v.equals(d.getId())) { 397 log.error("Work category '{}' cannot be assigned to work queue '{}'" 398 + " because it is already assigned to work queue '{}'", c, d.getId(), v); 399 } 400 return v; 401 }); 402 categoryToQueueId.putIfAbsent(c, d.getId()); 403 })); 404 } 405 406 protected WorkThreadPoolExecutor getExecutor(String queueId) { 407 if (!started) { 408 if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) { 409 LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode"); 410 init(); 411 } else { 412 throw new IllegalStateException("Work manager not started, could not access to executors"); 413 } 414 } 415 WorkQueueDescriptor workQueueDescriptor; 416 synchronized (getRegistry()) { 417 workQueueDescriptor = getDescriptor(QUEUES_EP, queueId); 418 } 419 if (workQueueDescriptor == null) { 420 throw new IllegalArgumentException("No such work queue: " + queueId); 421 } 422 423 return executors.get(queueId); 424 } 425 426 @Override 427 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 428 WorkThreadPoolExecutor executor = getExecutor(queueId); 429 return shutdownExecutors(Collections.singleton(executor), timeout, unit); 430 } 431 432 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 433 throws InterruptedException { 434 // mark executors as shutting down 435 for (WorkThreadPoolExecutor executor : list) { 436 executor.shutdownAndSuspend(); 437 } 438 timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); 439 // wait until threads termination 440 for (WorkThreadPoolExecutor executor : list) { 441 long t0 = System.currentTimeMillis(); 442 if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { 443 return false; 444 } 445 timeout -= System.currentTimeMillis() - t0; 446 } 447 return true; 448 } 449 450 /** 451 * @deprecated since 10.2 because unused 452 */ 453 @Deprecated 454 protected long remainingMillis(long t0, long delay) { 455 long d = System.currentTimeMillis() - t0; 456 if (d > delay) { 457 return 0; 458 } 459 return delay - d; 460 } 461 462 /** 463 * @deprecated since 10.2 because unused 464 */ 465 @Deprecated 466 protected synchronized void removeExecutor(String queueId) { 467 executors.remove(queueId); 468 } 469 470 @Override 471 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 472 shutdownInProgress = true; 473 try { 474 return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit); 475 } finally { 476 shutdownInProgress = false; 477 started = false; 478 } 479 } 480 481 /** 482 * A work instance and how to schedule it, for schedule-after-commit. 483 * 484 * @since 5.8 485 */ 486 public class WorkScheduling implements Synchronization { 487 public final Work work; 488 489 public final Scheduling scheduling; 490 491 public WorkScheduling(Work work, Scheduling scheduling) { 492 this.work = work; 493 this.scheduling = scheduling; 494 } 495 496 @Override 497 public void beforeCompletion() { 498 } 499 500 @Override 501 public void afterCompletion(int status) { 502 if (status == Status.STATUS_COMMITTED) { 503 schedule(work, scheduling, false); 504 } else if (status == Status.STATUS_ROLLEDBACK) { 505 work.setWorkInstanceState(State.UNKNOWN); 506 } else { 507 throw new IllegalArgumentException("Unsupported transaction status " + status); 508 } 509 } 510 } 511 512 /** 513 * Creates non-daemon threads at normal priority. 514 */ 515 private static class NamedThreadFactory implements ThreadFactory { 516 517 private final AtomicInteger threadNumber = new AtomicInteger(); 518 519 private final ThreadGroup group; 520 521 private final String prefix; 522 523 public NamedThreadFactory(String prefix) { 524 SecurityManager sm = System.getSecurityManager(); 525 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 526 this.prefix = prefix; 527 } 528 529 @Override 530 public Thread newThread(Runnable r) { 531 String name = prefix + threadNumber.incrementAndGet(); 532 Thread thread = new Thread(group, r, name); 533 // do not set daemon 534 thread.setPriority(Thread.NORM_PRIORITY); 535 thread.setUncaughtExceptionHandler(this::handleUncaughtException); 536 return thread; 537 } 538 539 protected void handleUncaughtException(Thread t, Throwable e) { 540 Log logLocal = LogFactory.getLog(WorkManagerImpl.class); 541 if (e instanceof RejectedExecutionException) { 542 // we are responsible of this exception, we use it during shutdown phase to not run the task taken just 543 // before shutdown due to race condition, so log it as WARN 544 logLocal.warn("Rejected execution error on thread " + t.getName(), e); 545 } else if (ExceptionUtils.hasInterruptedCause(e)) { 546 logLocal.warn("Interrupted error on thread" + t.getName(), e); 547 } else { 548 logLocal.error(String.format("Uncaught error on thread: %s, " 549 + "current work might be lost, WorkManager metrics might be corrupted.", t.getName()), e); 550 } 551 } 552 553 } 554 555 /** 556 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 557 * <p> 558 * Completed tasks are passed to another queue. 559 * <p> 560 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 561 * persisted, etc). 562 * 563 * @since 5.6 564 */ 565 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 566 567 protected final String queueId; 568 569 /** 570 * List of running Work instances, in order to be able to interrupt them if requested. 571 */ 572 // @GuardedBy("itself") 573 protected final ConcurrentLinkedQueue<Work> running; 574 575 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 576 // Number of work scheduled by this instance 577 protected final Counter scheduledCount; 578 579 // Number of work currently running on this instance 580 protected final Counter runningCount; 581 582 // Number of work completed by this instance 583 protected final Counter completedCount; 584 585 protected final Timer workTimer; 586 587 protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 588 NuxeoBlockingQueue queue, ThreadFactory threadFactory) { 589 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory); 590 queueId = queue.queueId; 591 running = new ConcurrentLinkedQueue<>(); 592 // init metrics 593 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 594 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 595 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 596 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 597 } 598 599 public int getScheduledOrRunningSize() { 600 int ret = 0; 601 for (String queueId : getWorkQueueIds()) { 602 ret += getQueueSize(queueId, null); 603 } 604 return ret; 605 } 606 607 @Override 608 public void execute(Runnable r) { 609 throw new UnsupportedOperationException("use other api"); 610 } 611 612 /** 613 * Executes the given task sometime in the future. 614 * 615 * @param work the work to execute 616 * @see #execute(Runnable) 617 * @deprecated since 10.2 because unused 618 */ 619 @Deprecated 620 public void execute(Work work) { 621 scheduledCount.inc(); 622 submit(work); 623 } 624 625 /** 626 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 627 * directly 628 */ 629 protected void submit(Work work) throws RuntimeException { 630 queuing.workSchedule(queueId, work); 631 } 632 633 @Override 634 protected void beforeExecute(Thread t, Runnable r) { 635 Work work = WorkHolder.getWork(r); 636 if (isShutdown()) { 637 work.setWorkInstanceState(State.SCHEDULED); 638 queuing.workReschedule(queueId, work); 639 throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work); 640 } 641 work.setWorkInstanceState(State.RUNNING); 642 queuing.workRunning(queueId, work); 643 running.add(work); 644 runningCount.inc(); 645 } 646 647 @Override 648 protected void afterExecute(Runnable r, Throwable t) { 649 Work work = WorkHolder.getWork(r); 650 try { 651 if (work.isSuspending()) { 652 log.trace("{} is suspending, giving up", work); 653 return; 654 } 655 if (isShutdown()) { 656 log.trace("rescheduling {}", work.getId(), t); 657 work.setWorkInstanceState(State.SCHEDULED); 658 queuing.workReschedule(queueId, work); 659 return; 660 } 661 work.setWorkInstanceState(State.UNKNOWN); 662 queuing.workCompleted(queueId, work); 663 } finally { 664 running.remove(work); 665 runningCount.dec(); 666 completedCount.inc(); 667 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 668 completionSynchronizer.signalCompletedWork(); 669 } 670 } 671 672 /** 673 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. 674 * 675 * @throws InterruptedException 676 */ 677 public void shutdownAndSuspend() throws InterruptedException { 678 try { 679 // don't consume the queue anymore 680 deactivateQueueMetrics(queueId); 681 queuing.setActive(queueId, false); 682 // suspend all running work 683 boolean hasRunningWork = false; 684 for (Work work : running) { 685 work.setWorkInstanceSuspending(); 686 log.trace("suspending and rescheduling {}", work.getId()); 687 work.setWorkInstanceState(State.SCHEDULED); 688 queuing.workReschedule(queueId, work); 689 hasRunningWork = true; 690 } 691 if (hasRunningWork) { 692 long shutdownDelay = Long.parseLong( 693 Framework.getService(ConfigurationService.class).getProperty(SHUTDOWN_DELAY_MS_KEY, "0")); 694 // sleep for a given amount of time for works to have time to persist their state and stop properly 695 Thread.sleep(shutdownDelay); 696 } 697 shutdownNow(); 698 } finally { 699 executors.remove(queueId); 700 } 701 } 702 703 public void removeScheduled(String workId) { 704 queuing.removeScheduled(queueId, workId); 705 } 706 707 } 708 709 @Override 710 public void schedule(Work work) { 711 schedule(work, Scheduling.ENQUEUE, false); 712 } 713 714 @Override 715 public void schedule(Work work, boolean afterCommit) { 716 schedule(work, Scheduling.ENQUEUE, afterCommit); 717 } 718 719 @Override 720 public void schedule(Work work, Scheduling scheduling) { 721 schedule(work, scheduling, false); 722 } 723 724 @Override 725 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 726 String workId = work.getId(); 727 String queueId = getCategoryQueueId(work.getCategory()); 728 if (!isQueuingEnabled(queueId)) { 729 return; 730 } 731 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 732 return; 733 } 734 work.setWorkInstanceState(State.SCHEDULED); 735 WorkSchedulePath.newInstance(work); 736 switch (scheduling) { 737 case ENQUEUE: 738 break; 739 case CANCEL_SCHEDULED: 740 getExecutor(queueId).removeScheduled(workId); 741 WorkStateHelper.setCanceled(work.getId()); 742 break; 743 case IF_NOT_SCHEDULED: 744 case IF_NOT_RUNNING_OR_SCHEDULED: 745 // TODO disabled for now because hasWorkInState uses isScheduled 746 // which is buggy 747 boolean disabled = Boolean.TRUE.booleanValue(); 748 if (!disabled && hasWorkInState(workId, scheduling.state)) { 749 log.debug("Canceling schedule because found: {}", scheduling); 750 return; 751 752 } 753 break; 754 755 } 756 queuing.workSchedule(queueId, work); 757 } 758 759 /** 760 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 761 * 762 * @since 5.8 763 */ 764 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 765 TransactionManager transactionManager; 766 try { 767 transactionManager = TransactionHelper.lookupTransactionManager(); 768 } catch (NamingException e) { 769 transactionManager = null; 770 } 771 if (transactionManager == null) { 772 log.debug("Not scheduling work after commit because of missing transaction manager: {}", work); 773 return false; 774 } 775 try { 776 Transaction transaction = transactionManager.getTransaction(); 777 if (transaction == null) { 778 log.debug("Not scheduling work after commit because of missing transaction: {}", work); 779 return false; 780 } 781 int status = transaction.getStatus(); 782 if (status == Status.STATUS_ACTIVE) { 783 log.debug("Scheduling work after commit: {}", work); 784 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 785 return true; 786 } else if (status == Status.STATUS_COMMITTED) { 787 // called in afterCompletion, we can schedule immediately 788 log.debug("Scheduling work immediately: {}", work); 789 return false; 790 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 791 log.debug("Cancelling schedule because transaction marked rollback-only: {}", work); 792 return true; 793 } else { 794 log.debug("Not scheduling work after commit because transaction is in status {}: {}", status, work); 795 return false; 796 } 797 } catch (SystemException | RollbackException e) { 798 log.error("Cannot schedule after commit", e); 799 return false; 800 } 801 } 802 803 @Override 804 public Work find(String workId, State state) { 805 return queuing.find(workId, state); 806 } 807 808 /** 809 * @param state SCHEDULED, RUNNING or null for both 810 */ 811 protected boolean hasWorkInState(String workId, State state) { 812 return queuing.isWorkInState(workId, state); 813 } 814 815 @Override 816 public State getWorkState(String workId) { 817 return queuing.getWorkState(workId); 818 } 819 820 @Override 821 public List<Work> listWork(String queueId, State state) { 822 // don't return scheduled after commit 823 return queuing.listWork(queueId, state); 824 } 825 826 @Override 827 public List<String> listWorkIds(String queueId, State state) { 828 return queuing.listWorkIds(queueId, state); 829 } 830 831 @Override 832 public WorkQueueMetrics getMetrics(String queueId) { 833 return queuing.metrics(queueId); 834 } 835 836 @Override 837 public int getQueueSize(String queueId, State state) { 838 WorkQueueMetrics metrics = getMetrics(queueId); 839 if (state == null) { 840 return metrics.scheduled.intValue() + metrics.running.intValue(); 841 } 842 if (state == State.SCHEDULED) { 843 return metrics.scheduled.intValue(); 844 } else if (state == State.RUNNING) { 845 return metrics.running.intValue(); 846 } else { 847 throw new IllegalArgumentException(String.valueOf(state)); 848 } 849 } 850 851 @Override 852 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 853 return awaitCompletion(null, duration, unit); 854 } 855 856 @Override 857 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 858 if (!isStarted()) { 859 return true; 860 } 861 SequenceTracer.start("awaitCompletion on " + (queueId == null ? "all queues" : queueId)); 862 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 863 long deadline = getTimestampAfter(durationInMs); 864 int pause = (int) Math.min(durationInMs, 500L); 865 log.debug("awaitForCompletion {} ms", durationInMs); 866 do { 867 if (noScheduledOrRunningWork(queueId)) { 868 completionSynchronizer.signalCompletedWork(); 869 SequenceTracer.stop("done"); 870 return true; 871 } 872 completionSynchronizer.waitForCompletedWork(pause); 873 } while (System.currentTimeMillis() < deadline); 874 log.info("awaitCompletion timeout after {} ms", durationInMs); 875 SequenceTracer.destroy("timeout after " + durationInMs + " ms"); 876 return false; 877 } 878 879 protected long getTimestampAfter(long durationInMs) { 880 long ret = System.currentTimeMillis() + durationInMs; 881 if (ret < 0) { 882 ret = Long.MAX_VALUE; 883 } 884 return ret; 885 } 886 887 protected boolean noScheduledOrRunningWork(String queueId) { 888 if (queueId == null) { 889 for (String id : getWorkQueueIds()) { 890 if (!noScheduledOrRunningWork(id)) { 891 return false; 892 } 893 } 894 return true; 895 } 896 if (!isProcessingEnabled(queueId)) { 897 return getExecutor(queueId).runningCount.getCount() == 0L; 898 } 899 if (getQueueSize(queueId, null) > 0) { 900 log.trace("{} not empty, sched: {}, running: {}", () -> queueId, 901 () -> getQueueSize(queueId, State.SCHEDULED), () -> getQueueSize(queueId, State.RUNNING)); 902 return false; 903 } 904 log.trace("{} is completed", queueId); 905 return true; 906 } 907 908 @Override 909 public boolean isStarted() { 910 return started && !shutdownInProgress; 911 } 912 913 @Override 914 public boolean supportsProcessingDisabling() { 915 return queuing.supportsProcessingDisabling(); 916 } 917 918}