001/* 002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.RejectedExecutionException; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036 037import javax.naming.NamingException; 038import javax.transaction.RollbackException; 039import javax.transaction.Status; 040import javax.transaction.Synchronization; 041import javax.transaction.SystemException; 042import javax.transaction.Transaction; 043import javax.transaction.TransactionManager; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.common.logging.SequenceTracer; 048import org.nuxeo.common.utils.ExceptionUtils; 049import org.nuxeo.ecm.core.api.NuxeoException; 050import org.nuxeo.ecm.core.event.EventServiceComponent; 051import org.nuxeo.ecm.core.work.WorkQueuing.Listener; 052import org.nuxeo.ecm.core.work.api.Work; 053import org.nuxeo.ecm.core.work.api.Work.State; 054import org.nuxeo.ecm.core.work.api.WorkManager; 055import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 056import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 057import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor; 058import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 059import org.nuxeo.runtime.api.Framework; 060import org.nuxeo.runtime.metrics.MetricsService; 061import org.nuxeo.runtime.metrics.NuxeoMetricSet; 062import org.nuxeo.runtime.model.ComponentContext; 063import org.nuxeo.runtime.model.ComponentInstance; 064import org.nuxeo.runtime.model.ComponentManager; 065import org.nuxeo.runtime.model.DefaultComponent; 066import org.nuxeo.runtime.services.config.ConfigurationService; 067import org.nuxeo.runtime.transaction.TransactionHelper; 068 069import com.codahale.metrics.Counter; 070import com.codahale.metrics.MetricRegistry; 071import com.codahale.metrics.SharedMetricRegistries; 072import com.codahale.metrics.Timer; 073 074/** 075 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 076 * implementation. 077 * 078 * @since 5.6 079 */ 080public class WorkManagerImpl extends DefaultComponent implements WorkManager { 081 082 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 083 084 private static final Log log = LogFactory.getLog(WorkManagerImpl.class); 085 086 protected static final String QUEUES_EP = "queues"; 087 088 protected static final String IMPL_EP = "implementation"; 089 090 public static final String DEFAULT_QUEUE_ID = "default"; 091 092 public static final String DEFAULT_CATEGORY = "default"; 093 094 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 095 096 /** 097 * @since 10.2 098 */ 099 public static final String SHUTDOWN_DELAY_MS_KEY = "nuxeo.work.shutdown.delay.ms"; 100 101 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 102 103 // @GuardedBy("itself") 104 protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry(); 105 106 protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry(); 107 108 // used synchronized 109 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 110 111 protected WorkQueuing queuing; 112 113 /** 114 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another 115 * node in cluster mode. 116 */ 117 protected class WorkCompletionSynchronizer { 118 protected final ReentrantLock lock = new ReentrantLock(); 119 120 protected final Condition condition = lock.newCondition(); 121 122 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 123 lock.lock(); 124 try { 125 return condition.await(timeMs, TimeUnit.MILLISECONDS); // caller is looping 126 } finally { 127 lock.unlock(); 128 } 129 } 130 131 protected void signalCompletedWork() { 132 lock.lock(); 133 try { 134 condition.signalAll(); 135 } finally { 136 lock.unlock(); 137 } 138 } 139 140 } 141 142 protected WorkCompletionSynchronizer completionSynchronizer; 143 144 @Override 145 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 146 if (QUEUES_EP.equals(extensionPoint)) { 147 registerWorkQueueDescriptor((WorkQueueDescriptor) contribution); 148 } else if (IMPL_EP.equals(extensionPoint)) { 149 registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 150 } else { 151 throw new RuntimeException("Unknown extension point: " + extensionPoint); 152 } 153 } 154 155 @Override 156 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 157 if (QUEUES_EP.equals(extensionPoint)) { 158 unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution); 159 } else if (IMPL_EP.equals(extensionPoint)) { 160 unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution); 161 } else { 162 throw new RuntimeException("Unknown extension point: " + extensionPoint); 163 } 164 } 165 166 void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 167 String queueId = workQueueDescriptor.id; 168 if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) { 169 Boolean processing = workQueueDescriptor.processing; 170 if (processing == null) { 171 log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES 172 + " with no processing/queuing"); 173 return; 174 } 175 String what = " processing=" + processing; 176 what += queuing == null ? "" : " queuing=" + queuing; 177 log.info("Setting on all work queues:" + what); 178 // activate/deactivate processing/queuing on all queues 179 List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy 180 for (String id : queueIds) { 181 // add an updated contribution redefining processing/queuing 182 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 183 wqd.id = id; 184 wqd.processing = processing; 185 registerWorkQueueDescriptor(wqd); 186 } 187 return; 188 } 189 workQueueConfig.addContribution(workQueueDescriptor); 190 WorkQueueDescriptor wqd = workQueueConfig.get(queueId); 191 log.info("Registered work queue " + queueId + " " + wqd.toString()); 192 } 193 194 void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) { 195 String id = workQueueDescriptor.id; 196 if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) { 197 return; 198 } 199 workQueueConfig.removeContribution(workQueueDescriptor); 200 log.info("Unregistered work queue " + id); 201 } 202 203 void initializeQueue(WorkQueueDescriptor config) { 204 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 205 throw new IllegalArgumentException("cannot initialize all queues"); 206 } 207 if (queuing.getQueue(config.id) != null) { 208 throw new IllegalStateException("work queue " + config.id + " is already initialized"); 209 } 210 if (executors.containsKey(config.id)) { 211 throw new IllegalStateException("work queue " + config.id + " already have an executor"); 212 } 213 NuxeoBlockingQueue queue = queuing.init(config); 214 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-"); 215 int maxPoolSize = config.getMaxThreads(); 216 WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, 217 queue, threadFactory); 218 // prestart all core threads so that direct additions to the queue 219 // (from another Nuxeo instance) can be seen 220 executor.prestartAllCoreThreads(); 221 executors.put(config.id, executor); 222 log.info("Initialized work queue " + config.id + " " + config.toEffectiveString()); 223 } 224 225 void activateQueue(WorkQueueDescriptor config) { 226 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 227 throw new IllegalArgumentException("cannot activate all queues"); 228 } 229 queuing.setActive(config.id, config.isProcessingEnabled()); 230 log.info("Activated work queue " + config.id + " " + config.toEffectiveString()); 231 // Enable metrics 232 if (config.isProcessingEnabled()) { 233 activateQueueMetrics(config.id); 234 } 235 } 236 237 void deactivateQueue(WorkQueueDescriptor config) { 238 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 239 throw new IllegalArgumentException("cannot deactivate all queues"); 240 } 241 // Disable metrics 242 if (config.isProcessingEnabled()) { 243 deactivateQueueMetrics(config.id); 244 } 245 queuing.setActive(config.id, false); 246 log.info("Deactivated work queue " + config.id); 247 } 248 249 void activateQueueMetrics(String queueId) { 250 NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId); 251 queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled"); 252 queueMetrics.putGauge(() -> getMetrics(queueId).running, "running"); 253 queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed"); 254 queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled"); 255 registry.registerAll(queueMetrics); 256 } 257 258 void deactivateQueueMetrics(String queueId) { 259 String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId); 260 registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName)); 261 } 262 263 void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 264 workQueuingConfig.addContribution(descr); 265 } 266 267 void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) { 268 workQueuingConfig.removeContribution(descr); 269 } 270 271 protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) { 272 try { 273 return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener()); 274 } catch (ReflectiveOperationException | SecurityException e) { 275 throw new RuntimeException(e); 276 } 277 } 278 279 @Override 280 public boolean isQueuingEnabled(String queueId) { 281 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 282 return wqd != null && wqd.isQueuingEnabled(); 283 } 284 285 @Override 286 public void enableProcessing(boolean value) { 287 workQueueConfig.getQueueIds().forEach(id -> enableProcessing(id, value)); 288 } 289 290 @Override 291 public void enableProcessing(String queueId, boolean value) { 292 WorkQueueDescriptor config = workQueueConfig.get(queueId); 293 if (config == null) { 294 throw new IllegalArgumentException("no such queue " + queueId); 295 } 296 if (!value) { 297 deactivateQueue(config); 298 } else { 299 activateQueue(config); 300 } 301 } 302 303 @Override 304 public boolean isProcessingEnabled() { 305 for (String queueId : workQueueConfig.getQueueIds()) { 306 if (queuing.getQueue(queueId).active) { 307 return true; 308 } 309 } 310 return false; 311 } 312 313 @Override 314 public boolean isProcessingEnabled(String queueId) { 315 if (queueId == null) { 316 return isProcessingEnabled(); 317 } 318 return queuing.getQueue(queueId).active; 319 } 320 321 // ----- WorkManager ----- 322 323 @Override 324 public List<String> getWorkQueueIds() { 325 synchronized (workQueueConfig) { 326 return workQueueConfig.getQueueIds(); 327 } 328 } 329 330 @Override 331 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 332 synchronized (workQueueConfig) { 333 return workQueueConfig.get(queueId); 334 } 335 } 336 337 @Override 338 public String getCategoryQueueId(String category) { 339 if (category == null) { 340 category = DEFAULT_CATEGORY; 341 } 342 String queueId = workQueueConfig.getQueueId(category); 343 if (queueId == null) { 344 queueId = DEFAULT_QUEUE_ID; 345 } 346 return queueId; 347 } 348 349 @Override 350 public int getApplicationStartedOrder() { 351 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 352 } 353 354 @Override 355 public void start(ComponentContext context) { 356 init(); 357 } 358 359 @Override 360 public void stop(ComponentContext context) throws InterruptedException { 361 // do nothing: we shutdown the thread pool at first before stopping any component 362 // see ComponentManager.Listener.beforeStop 363 } 364 365 protected volatile boolean started = false; 366 367 protected volatile boolean shutdownInProgress = false; 368 369 @Override 370 public void init() { 371 if (started) { 372 return; 373 } 374 synchronized (this) { 375 if (started) { 376 return; 377 } 378 queuing = newWorkQueuing(workQueuingConfig.klass); 379 completionSynchronizer = new WorkCompletionSynchronizer(); 380 started = true; 381 workQueueConfig.index(); 382 for (String id : workQueueConfig.getQueueIds()) { 383 initializeQueue(workQueueConfig.get(id)); 384 } 385 386 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { 387 @Override 388 public void beforeStop(ComponentManager mgr, boolean isStandby) { 389 for (String id : workQueueConfig.getQueueIds()) { 390 deactivateQueue(workQueueConfig.get(id)); 391 } 392 try { 393 if (!shutdown(10, TimeUnit.SECONDS)) { 394 log.error("Some processors are still active"); 395 } 396 } catch (InterruptedException e) { 397 Thread.currentThread().interrupt(); 398 throw new NuxeoException("Interrupted while stopping work manager thread pools", e); 399 } 400 } 401 402 @Override 403 public void afterStart(ComponentManager mgr, boolean isResume) { 404 for (String id : workQueueConfig.getQueueIds()) { 405 activateQueue(workQueueConfig.get(id)); 406 } 407 } 408 409 @Override 410 public void afterStop(ComponentManager mgr, boolean isStandby) { 411 Framework.getRuntime().getComponentManager().removeListener(this); 412 } 413 }); 414 } 415 } 416 417 protected WorkThreadPoolExecutor getExecutor(String queueId) { 418 if (!started) { 419 if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) { 420 LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode"); 421 init(); 422 } else { 423 throw new IllegalStateException("Work manager not started, could not access to executors"); 424 } 425 } 426 WorkQueueDescriptor workQueueDescriptor; 427 synchronized (workQueueConfig) { 428 workQueueDescriptor = workQueueConfig.get(queueId); 429 } 430 if (workQueueDescriptor == null) { 431 throw new IllegalArgumentException("No such work queue: " + queueId); 432 } 433 434 return executors.get(queueId); 435 } 436 437 @Override 438 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 439 WorkThreadPoolExecutor executor = getExecutor(queueId); 440 return shutdownExecutors(Collections.singleton(executor), timeout, unit); 441 } 442 443 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 444 throws InterruptedException { 445 // mark executors as shutting down 446 for (WorkThreadPoolExecutor executor : list) { 447 executor.shutdownAndSuspend(); 448 } 449 timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); 450 // wait until threads termination 451 for (WorkThreadPoolExecutor executor : list) { 452 long t0 = System.currentTimeMillis(); 453 if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { 454 return false; 455 } 456 timeout -= System.currentTimeMillis() - t0; 457 } 458 return true; 459 } 460 461 /** 462 * @deprecated since 10.2 because unused 463 */ 464 @Deprecated 465 protected long remainingMillis(long t0, long delay) { 466 long d = System.currentTimeMillis() - t0; 467 if (d > delay) { 468 return 0; 469 } 470 return delay - d; 471 } 472 473 /** 474 * @deprecated since 10.2 because unused 475 */ 476 @Deprecated 477 protected synchronized void removeExecutor(String queueId) { 478 executors.remove(queueId); 479 } 480 481 @Override 482 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 483 shutdownInProgress = true; 484 try { 485 return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit); 486 } finally { 487 shutdownInProgress = false; 488 started = false; 489 } 490 } 491 492 /** 493 * A work instance and how to schedule it, for schedule-after-commit. 494 * 495 * @since 5.8 496 */ 497 public class WorkScheduling implements Synchronization { 498 public final Work work; 499 500 public final Scheduling scheduling; 501 502 public WorkScheduling(Work work, Scheduling scheduling) { 503 this.work = work; 504 this.scheduling = scheduling; 505 } 506 507 @Override 508 public void beforeCompletion() { 509 } 510 511 @Override 512 public void afterCompletion(int status) { 513 if (status == Status.STATUS_COMMITTED) { 514 schedule(work, scheduling, false); 515 } else if (status == Status.STATUS_ROLLEDBACK) { 516 work.setWorkInstanceState(State.UNKNOWN); 517 } else { 518 throw new IllegalArgumentException("Unsupported transaction status " + status); 519 } 520 } 521 } 522 523 /** 524 * Creates non-daemon threads at normal priority. 525 */ 526 private static class NamedThreadFactory implements ThreadFactory { 527 528 private final AtomicInteger threadNumber = new AtomicInteger(); 529 530 private final ThreadGroup group; 531 532 private final String prefix; 533 534 public NamedThreadFactory(String prefix) { 535 SecurityManager sm = System.getSecurityManager(); 536 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 537 this.prefix = prefix; 538 } 539 540 @Override 541 public Thread newThread(Runnable r) { 542 String name = prefix + threadNumber.incrementAndGet(); 543 Thread thread = new Thread(group, r, name); 544 // do not set daemon 545 thread.setPriority(Thread.NORM_PRIORITY); 546 thread.setUncaughtExceptionHandler(this::handleUncaughtException); 547 return thread; 548 } 549 550 protected void handleUncaughtException(Thread t, Throwable e) { 551 Log logLocal = LogFactory.getLog(WorkManagerImpl.class); 552 if (e instanceof RejectedExecutionException) { 553 // we are responsible of this exception, we use it during shutdown phase to not run the task taken just 554 // before shutdown due to race condition, so log it as WARN 555 logLocal.warn("Rejected execution error on thread " + t.getName(), e); 556 } else if (ExceptionUtils.hasInterruptedCause(e)) { 557 logLocal.warn("Interrupted error on thread" + t.getName(), e); 558 } else { 559 logLocal.error(String.format("Uncaught error on thread: %s, " 560 + "current work might be lost, WorkManager metrics might be corrupted.", t.getName()), e); 561 } 562 } 563 564 } 565 566 /** 567 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 568 * <p> 569 * Completed tasks are passed to another queue. 570 * <p> 571 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 572 * persisted, etc). 573 * 574 * @since 5.6 575 */ 576 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 577 578 protected final String queueId; 579 580 /** 581 * List of running Work instances, in order to be able to interrupt them if requested. 582 */ 583 // @GuardedBy("itself") 584 protected final ConcurrentLinkedQueue<Work> running; 585 586 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 587 // Number of work scheduled by this instance 588 protected final Counter scheduledCount; 589 590 // Number of work currently running on this instance 591 protected final Counter runningCount; 592 593 // Number of work completed by this instance 594 protected final Counter completedCount; 595 596 protected final Timer workTimer; 597 598 protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 599 NuxeoBlockingQueue queue, ThreadFactory threadFactory) { 600 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory); 601 queueId = queue.queueId; 602 running = new ConcurrentLinkedQueue<>(); 603 // init metrics 604 scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count")); 605 runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running")); 606 completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed")); 607 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total")); 608 } 609 610 public int getScheduledOrRunningSize() { 611 int ret = 0; 612 for (String queueId : getWorkQueueIds()) { 613 ret += getQueueSize(queueId, null); 614 } 615 return ret; 616 } 617 618 @Override 619 public void execute(Runnable r) { 620 throw new UnsupportedOperationException("use other api"); 621 } 622 623 /** 624 * Executes the given task sometime in the future. 625 * 626 * @param work the work to execute 627 * @see #execute(Runnable) 628 * @deprecated since 10.2 because unused 629 */ 630 @Deprecated 631 public void execute(Work work) { 632 scheduledCount.inc(); 633 submit(work); 634 } 635 636 /** 637 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 638 * directly 639 */ 640 protected void submit(Work work) throws RuntimeException { 641 queuing.workSchedule(queueId, work); 642 } 643 644 @Override 645 protected void beforeExecute(Thread t, Runnable r) { 646 Work work = WorkHolder.getWork(r); 647 if (isShutdown()) { 648 work.setWorkInstanceState(State.SCHEDULED); 649 queuing.workReschedule(queueId, work); 650 throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work); 651 } 652 work.setWorkInstanceState(State.RUNNING); 653 queuing.workRunning(queueId, work); 654 running.add(work); 655 runningCount.inc(); 656 } 657 658 @Override 659 protected void afterExecute(Runnable r, Throwable t) { 660 Work work = WorkHolder.getWork(r); 661 try { 662 if (work.isSuspending()) { 663 log.trace(work + " is suspending, giving up"); 664 return; 665 } 666 if (isShutdown()) { 667 log.trace("rescheduling " + work.getId(), t); 668 work.setWorkInstanceState(State.SCHEDULED); 669 queuing.workReschedule(queueId, work); 670 return; 671 } 672 work.setWorkInstanceState(State.UNKNOWN); 673 queuing.workCompleted(queueId, work); 674 } finally { 675 running.remove(work); 676 runningCount.dec(); 677 completedCount.inc(); 678 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 679 completionSynchronizer.signalCompletedWork(); 680 } 681 } 682 683 /** 684 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. 685 * 686 * @throws InterruptedException 687 */ 688 public void shutdownAndSuspend() throws InterruptedException { 689 try { 690 // don't consume the queue anymore 691 deactivateQueueMetrics(queueId); 692 queuing.setActive(queueId, false); 693 // suspend all running work 694 boolean hasRunningWork = false; 695 for (Work work : running) { 696 work.setWorkInstanceSuspending(); 697 log.trace("suspending and rescheduling " + work.getId()); 698 work.setWorkInstanceState(State.SCHEDULED); 699 queuing.workReschedule(queueId, work); 700 hasRunningWork = true; 701 } 702 if (hasRunningWork) { 703 long shutdownDelay = Long.parseLong( 704 Framework.getService(ConfigurationService.class).getProperty(SHUTDOWN_DELAY_MS_KEY, "0")); 705 // sleep for a given amount of time for works to have time to persist their state and stop properly 706 Thread.sleep(shutdownDelay); 707 } 708 shutdownNow(); 709 } finally { 710 executors.remove(queueId); 711 } 712 } 713 714 public void removeScheduled(String workId) { 715 queuing.removeScheduled(queueId, workId); 716 } 717 718 } 719 720 @Override 721 public void schedule(Work work) { 722 schedule(work, Scheduling.ENQUEUE, false); 723 } 724 725 @Override 726 public void schedule(Work work, boolean afterCommit) { 727 schedule(work, Scheduling.ENQUEUE, afterCommit); 728 } 729 730 @Override 731 public void schedule(Work work, Scheduling scheduling) { 732 schedule(work, scheduling, false); 733 } 734 735 @Override 736 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 737 String workId = work.getId(); 738 String queueId = getCategoryQueueId(work.getCategory()); 739 if (!isQueuingEnabled(queueId)) { 740 return; 741 } 742 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 743 return; 744 } 745 work.setWorkInstanceState(State.SCHEDULED); 746 WorkSchedulePath.newInstance(work); 747 switch (scheduling) { 748 case ENQUEUE: 749 break; 750 case CANCEL_SCHEDULED: 751 getExecutor(queueId).removeScheduled(workId); 752 WorkStateHelper.setCanceled(work.getId()); 753 break; 754 case IF_NOT_SCHEDULED: 755 case IF_NOT_RUNNING_OR_SCHEDULED: 756 // TODO disabled for now because hasWorkInState uses isScheduled 757 // which is buggy 758 boolean disabled = Boolean.TRUE.booleanValue(); 759 if (!disabled && hasWorkInState(workId, scheduling.state)) { 760 if (log.isDebugEnabled()) { 761 log.debug("Canceling schedule because found: " + scheduling); 762 } 763 return; 764 765 } 766 break; 767 768 } 769 queuing.workSchedule(queueId, work); 770 } 771 772 /** 773 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 774 * 775 * @since 5.8 776 */ 777 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 778 TransactionManager transactionManager; 779 try { 780 transactionManager = TransactionHelper.lookupTransactionManager(); 781 } catch (NamingException e) { 782 transactionManager = null; 783 } 784 if (transactionManager == null) { 785 if (log.isDebugEnabled()) { 786 log.debug("Not scheduling work after commit because of missing transaction manager: " + work); 787 } 788 return false; 789 } 790 try { 791 Transaction transaction = transactionManager.getTransaction(); 792 if (transaction == null) { 793 if (log.isDebugEnabled()) { 794 log.debug("Not scheduling work after commit because of missing transaction: " + work); 795 } 796 return false; 797 } 798 int status = transaction.getStatus(); 799 if (status == Status.STATUS_ACTIVE) { 800 if (log.isDebugEnabled()) { 801 log.debug("Scheduling work after commit: " + work); 802 } 803 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 804 return true; 805 } else if (status == Status.STATUS_COMMITTED) { 806 // called in afterCompletion, we can schedule immediately 807 if (log.isDebugEnabled()) { 808 log.debug("Scheduling work immediately: " + work); 809 } 810 return false; 811 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 812 if (log.isDebugEnabled()) { 813 log.debug("Cancelling schedule because transaction marked rollback-only: " + work); 814 } 815 return true; 816 } else { 817 if (log.isDebugEnabled()) { 818 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 819 + work); 820 } 821 return false; 822 } 823 } catch (SystemException | RollbackException e) { 824 log.error("Cannot schedule after commit", e); 825 return false; 826 } 827 } 828 829 @Override 830 public Work find(String workId, State state) { 831 return queuing.find(workId, state); 832 } 833 834 /** 835 * @param state SCHEDULED, RUNNING or null for both 836 */ 837 protected boolean hasWorkInState(String workId, State state) { 838 return queuing.isWorkInState(workId, state); 839 } 840 841 @Override 842 public State getWorkState(String workId) { 843 return queuing.getWorkState(workId); 844 } 845 846 @Override 847 public List<Work> listWork(String queueId, State state) { 848 // don't return scheduled after commit 849 return queuing.listWork(queueId, state); 850 } 851 852 @Override 853 public List<String> listWorkIds(String queueId, State state) { 854 return queuing.listWorkIds(queueId, state); 855 } 856 857 @Override 858 public WorkQueueMetrics getMetrics(String queueId) { 859 return queuing.metrics(queueId); 860 } 861 862 @Override 863 public int getQueueSize(String queueId, State state) { 864 WorkQueueMetrics metrics = getMetrics(queueId); 865 if (state == null) { 866 return metrics.scheduled.intValue() + metrics.running.intValue(); 867 } 868 if (state == State.SCHEDULED) { 869 return metrics.scheduled.intValue(); 870 } else if (state == State.RUNNING) { 871 return metrics.running.intValue(); 872 } else { 873 throw new IllegalArgumentException(String.valueOf(state)); 874 } 875 } 876 877 @Override 878 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 879 return awaitCompletion(null, duration, unit); 880 } 881 882 @Override 883 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 884 if (!isStarted()) { 885 return true; 886 } 887 SequenceTracer.start("awaitCompletion on " + (queueId == null ? "all queues" : queueId)); 888 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 889 long deadline = getTimestampAfter(durationInMs); 890 int pause = (int) Math.min(durationInMs, 500L); 891 log.debug("awaitForCompletion " + durationInMs + " ms"); 892 do { 893 if (noScheduledOrRunningWork(queueId)) { 894 completionSynchronizer.signalCompletedWork(); 895 SequenceTracer.stop("done"); 896 return true; 897 } 898 completionSynchronizer.waitForCompletedWork(pause); 899 } while (System.currentTimeMillis() < deadline); 900 log.info("awaitCompletion timeout after " + durationInMs + " ms"); 901 SequenceTracer.destroy("timeout after " + durationInMs + " ms"); 902 return false; 903 } 904 905 protected long getTimestampAfter(long durationInMs) { 906 long ret = System.currentTimeMillis() + durationInMs; 907 if (ret < 0) { 908 ret = Long.MAX_VALUE; 909 } 910 return ret; 911 } 912 913 protected boolean noScheduledOrRunningWork(String queueId) { 914 if (queueId == null) { 915 for (String id : getWorkQueueIds()) { 916 if (!noScheduledOrRunningWork(id)) { 917 return false; 918 } 919 } 920 return true; 921 } 922 if (!isProcessingEnabled(queueId)) { 923 return getExecutor(queueId).runningCount.getCount() == 0L; 924 } 925 if (getQueueSize(queueId, null) > 0) { 926 if (log.isTraceEnabled()) { 927 log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + ", running: " 928 + getQueueSize(queueId, State.RUNNING)); 929 } 930 return false; 931 } 932 if (log.isTraceEnabled()) { 933 log.trace(queueId + " is completed"); 934 } 935 return true; 936 } 937 938 @Override 939 public boolean isStarted() { 940 return started && !shutdownInProgress; 941 } 942 943}