001/* 002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.work; 021 022import static org.nuxeo.ecm.core.work.api.WorkQueueDescriptor.ALL_QUEUES; 023 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.RejectedExecutionException; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.locks.Condition; 037import java.util.concurrent.locks.ReentrantLock; 038import java.util.stream.Collectors; 039 040import javax.naming.NamingException; 041import javax.transaction.RollbackException; 042import javax.transaction.Status; 043import javax.transaction.Synchronization; 044import javax.transaction.SystemException; 045import javax.transaction.Transaction; 046import javax.transaction.TransactionManager; 047 048import org.apache.commons.logging.Log; 049import org.apache.commons.logging.LogFactory; 050import org.apache.logging.log4j.LogManager; 051import org.apache.logging.log4j.Logger; 052import org.nuxeo.common.utils.ExceptionUtils; 053import org.nuxeo.ecm.core.api.NuxeoException; 054import org.nuxeo.ecm.core.event.EventServiceComponent; 055import org.nuxeo.ecm.core.work.WorkQueuing.Listener; 056import org.nuxeo.ecm.core.work.api.Work; 057import org.nuxeo.ecm.core.work.api.Work.State; 058import org.nuxeo.ecm.core.work.api.WorkManager; 059import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 060import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 061import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor; 062import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 063import org.nuxeo.lib.stream.codec.AvroMessageCodec; 064import org.nuxeo.lib.stream.codec.Codec; 065import org.nuxeo.lib.stream.computation.Record; 066import org.nuxeo.lib.stream.log.Name; 067import org.nuxeo.runtime.api.Framework; 068import org.nuxeo.runtime.metrics.MetricsService; 069import org.nuxeo.runtime.metrics.NuxeoMetricSet; 070import org.nuxeo.runtime.model.ComponentContext; 071import org.nuxeo.runtime.model.ComponentInstance; 072import org.nuxeo.runtime.model.ComponentManager; 073import org.nuxeo.runtime.model.DefaultComponent; 074import org.nuxeo.runtime.model.Descriptor; 075import org.nuxeo.runtime.services.config.ConfigurationService; 076import org.nuxeo.runtime.stream.StreamService; 077import org.nuxeo.runtime.transaction.TransactionHelper; 078 079import io.dropwizard.metrics5.Counter; 080import io.dropwizard.metrics5.MetricName; 081import io.dropwizard.metrics5.MetricRegistry; 082import io.dropwizard.metrics5.SharedMetricRegistries; 083import io.dropwizard.metrics5.Timer; 084 085import io.opencensus.common.Scope; 086import io.opencensus.trace.Tracer; 087import io.opencensus.trace.Tracing; 088 089/** 090 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing} 091 * implementation. 092 * 093 * @since 5.6 094 */ 095public class WorkManagerImpl extends DefaultComponent implements WorkManager { 096 097 private static final Logger log = LogManager.getLogger(WorkManagerImpl.class); 098 099 public static final String NAME = "org.nuxeo.ecm.core.work.service"; 100 101 protected static final String QUEUES_EP = "queues"; 102 103 protected static final String IMPL_EP = "implementation"; 104 105 public static final String DEFAULT_QUEUE_ID = "default"; 106 107 public static final String DEFAULT_CATEGORY = "default"; 108 109 protected static final String THREAD_PREFIX = "Nuxeo-Work-"; 110 111 /** 112 * @since 10.2 113 */ 114 public static final String SHUTDOWN_DELAY_MS_KEY = "nuxeo.work.shutdown.delay.ms"; 115 116 /** 117 * @since 11.1 118 * @deprecated Use {@link #WORKMANAGER_PROCESSING_ENABLED} instead 119 */ 120 @Deprecated(since = "11.2") 121 public static final String WORKMANAGER_PROCESSING_DISABLE = "nuxeo.work.processing.disable"; 122 123 /** 124 * @since 11.2 125 */ 126 public static final String WORKMANAGER_PROCESSING_ENABLED = "nuxeo.work.processing.enabled"; 127 128 /** 129 * The dead letter queue stream name. 130 * 131 * @since 11.1 132 */ 133 public static final Name DEAD_LETTER_QUEUE = Name.ofUrn("work/dlq"); 134 135 // @since 11.1 136 public static final Codec<Record> DEAD_LETTER_QUEUE_CODEC = new AvroMessageCodec<>(Record.class); 137 138 // @since 11.1 139 protected static final String GLOBAL_METRIC_PREFIX = "nuxeo.works.global.queue"; 140 141 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 142 143 // used synchronized 144 protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>(); 145 146 protected final Map<String, String> categoryToQueueId = new HashMap<>(); 147 148 protected WorkQueuing queuing; 149 150 protected boolean active = true; 151 152 public WorkManagerImpl() { 153 // make subclasses (StreamWorkManager) use the same registry 154 super.setName(NAME); 155 } 156 157 @Override 158 public void setName(String name) { 159 // do nothing: name is hardcoded and does not depend on XML configuration 160 } 161 162 /** 163 * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another 164 * node in cluster mode. 165 */ 166 protected static class WorkCompletionSynchronizer { 167 protected final ReentrantLock lock = new ReentrantLock(); 168 169 protected final Condition condition = lock.newCondition(); 170 171 protected boolean waitForCompletedWork(long timeMs) throws InterruptedException { 172 lock.lock(); 173 try { 174 return condition.await(timeMs, TimeUnit.MILLISECONDS); // caller is looping 175 } finally { 176 lock.unlock(); 177 } 178 } 179 180 protected void signalCompletedWork() { 181 lock.lock(); 182 try { 183 condition.signalAll(); 184 } finally { 185 lock.unlock(); 186 } 187 } 188 189 } 190 191 protected WorkCompletionSynchronizer completionSynchronizer; 192 193 @Override 194 public void registerContribution(Object contribution, String xp, ComponentInstance component) { 195 if (QUEUES_EP.equals(xp)) { 196 WorkQueueDescriptor descriptor = (WorkQueueDescriptor) contribution; 197 if (ALL_QUEUES.equals(descriptor.getId())) { 198 Boolean processing = descriptor.processing; 199 if (processing == null) { 200 log.error("Ignoring work queue descriptor {} with no processing/queuing", ALL_QUEUES); 201 return; 202 } 203 log.info("Setting on all work queues:{}", 204 () -> " processing=" + processing + (queuing == null ? "" : " queuing=" + queuing)); 205 // activate/deactivate processing on all queues 206 getDescriptors(QUEUES_EP).forEach(d -> { 207 WorkQueueDescriptor wqd = new WorkQueueDescriptor(); 208 wqd.id = d.getId(); 209 wqd.processing = processing; 210 register(QUEUES_EP, wqd); 211 }); 212 } else { 213 register(QUEUES_EP, descriptor); 214 } 215 } else { 216 super.registerContribution(contribution, xp, component); 217 } 218 } 219 220 void initializeQueue(WorkQueueDescriptor config) { 221 if (ALL_QUEUES.equals(config.id)) { 222 throw new IllegalArgumentException("cannot initialize all queues"); 223 } 224 if (queuing.getQueue(config.id) != null) { 225 throw new IllegalStateException("work queue " + config.id + " is already initialized"); 226 } 227 if (executors.containsKey(config.id)) { 228 throw new IllegalStateException("work queue " + config.id + " already have an executor"); 229 } 230 NuxeoBlockingQueue queue = queuing.init(config); 231 ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-"); 232 int maxPoolSize = config.getMaxThreads(); 233 WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, 234 queue, threadFactory); 235 // prestart all core threads so that direct additions to the queue 236 // (from another Nuxeo instance) can be seen 237 executor.prestartAllCoreThreads(); 238 executors.put(config.id, executor); 239 log.info("Initialized work queue {}, {}", config.id, config); 240 } 241 242 void activateQueue(WorkQueueDescriptor config) { 243 if (ALL_QUEUES.equals(config.id)) { 244 throw new IllegalArgumentException("cannot activate all queues"); 245 } 246 queuing.setActive(config.id, config.isProcessingEnabled()); 247 log.info("Activated work queue {}, {}", config.id, config); 248 // Enable metrics 249 if (config.isProcessingEnabled()) { 250 activateQueueMetrics(config.id); 251 } 252 } 253 254 void deactivateQueue(WorkQueueDescriptor config) { 255 if (ALL_QUEUES.equals(config.id)) { 256 throw new IllegalArgumentException("cannot deactivate all queues"); 257 } 258 // Disable metrics 259 if (config.isProcessingEnabled()) { 260 deactivateQueueMetrics(config.id); 261 } 262 queuing.setActive(config.id, false); 263 log.info("Deactivated work queue {}", config.id); 264 } 265 266 void activateQueueMetrics(String queueId) { 267 NuxeoMetricSet queueMetrics = new NuxeoMetricSet( 268 MetricName.build(GLOBAL_METRIC_PREFIX).tagged("queue", queueId)); 269 queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled"); 270 queueMetrics.putGauge(() -> getMetrics(queueId).running, "running"); 271 queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed"); 272 queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled"); 273 registry.registerAll(queueMetrics); 274 } 275 276 void deactivateQueueMetrics(String queueId) { 277 String queueMetricsName = MetricName.build(GLOBAL_METRIC_PREFIX).tagged("queue", queueId).getKey(); 278 registry.removeMatching((name, metric) -> name.getKey().startsWith(queueMetricsName)); 279 } 280 281 @Override 282 public boolean isQueuingEnabled(String queueId) { 283 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 284 return wqd != null && wqd.isQueuingEnabled(); 285 } 286 287 @Override 288 public void enableProcessing(boolean value) { 289 getDescriptors(QUEUES_EP).forEach(d -> enableProcessing(d.getId(), value)); 290 } 291 292 @Override 293 public void enableProcessing(String queueId, boolean value) { 294 WorkQueueDescriptor config = getDescriptor(QUEUES_EP, queueId); 295 if (config == null) { 296 throw new IllegalArgumentException("no such queue " + queueId); 297 } 298 if (!value) { 299 if (!queuing.supportsProcessingDisabling()) { 300 log.error("Attempting to disable works processing on a WorkQueuing instance that does not support it. " 301 + "Works will still be processed. " 302 + "Disabling works processing to manage distribution finely can be done using Redis or Stream implementations."); 303 } 304 deactivateQueue(config); 305 } else { 306 activateQueue(config); 307 } 308 } 309 310 @Override 311 public boolean isProcessingEnabled() { 312 if (isProcessingDisabled()) { 313 return false; 314 } 315 for (Descriptor d : getDescriptors(QUEUES_EP)) { 316 if (queuing.getQueue(d.getId()).active) { 317 return true; 318 } 319 } 320 return false; 321 } 322 323 protected boolean isProcessingDisabled() { 324 if (Boolean.parseBoolean(Framework.getProperty(WORKMANAGER_PROCESSING_DISABLE, "false"))) { 325 log.warn("nuxeo.work.processing.disable=true is now deprecated, use nuxeo.work.processing.enabled=false instead"); 326 return true; 327 } 328 if (Framework.isBooleanPropertyFalse(WORKMANAGER_PROCESSING_ENABLED)) { 329 return true; 330 } 331 return false; 332 } 333 334 @Override 335 public boolean isProcessingEnabled(String queueId) { 336 if (isProcessingDisabled()) { 337 return false; 338 } 339 if (queueId == null) { 340 return isProcessingEnabled(); 341 } 342 return queuing.getQueue(queueId).active; 343 } 344 345 // ----- WorkManager ----- 346 347 @Override 348 public List<String> getWorkQueueIds() { 349 synchronized (getRegistry()) { 350 return getDescriptors(QUEUES_EP).stream().map(Descriptor::getId).collect(Collectors.toList()); 351 } 352 } 353 354 @Override 355 public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) { 356 synchronized (getRegistry()) { 357 return getDescriptor(QUEUES_EP, queueId); 358 } 359 } 360 361 @Override 362 public String getCategoryQueueId(String category) { 363 if (category == null) { 364 category = DEFAULT_CATEGORY; 365 } 366 String queueId = categoryToQueueId.get(category); 367 if (queueId == null) { 368 queueId = DEFAULT_QUEUE_ID; 369 } 370 return queueId; 371 } 372 373 @Override 374 public int getApplicationStartedOrder() { 375 return EventServiceComponent.APPLICATION_STARTED_ORDER - 1; 376 } 377 378 @Override 379 public void start(ComponentContext context) { 380 super.start(context); 381 initDeadLetterQueueStream(); 382 init(); 383 } 384 385 protected void initDeadLetterQueueStream() { 386 StreamService service = Framework.getService(StreamService.class); 387 if (service == null) { 388 return; 389 } 390 try { 391 org.nuxeo.lib.stream.log.LogManager logManager = service.getLogManager(); 392 if (!logManager.exists(DEAD_LETTER_QUEUE)) { 393 log.info("Initializing dead letter queue to store Work in failure"); 394 logManager.createIfNotExists(DEAD_LETTER_QUEUE, 1); 395 } 396 // Needed to initialize the appender with the proper codec 397 logManager.getAppender(DEAD_LETTER_QUEUE, DEAD_LETTER_QUEUE_CODEC); 398 } catch (IllegalArgumentException e) { 399 log.info("No default LogManager found, there will be no dead letter queuing for Work in failure."); 400 } 401 } 402 403 protected volatile boolean started = false; 404 405 protected volatile boolean shutdownInProgress = false; 406 407 @Override 408 public void init() { 409 if (started || !active) { 410 return; 411 } 412 synchronized (this) { 413 if (started) { 414 return; 415 } 416 WorkQueuingDescriptor d = getDescriptor(IMPL_EP, Descriptor.UNIQUE_DESCRIPTOR_ID); 417 try { 418 queuing = d.klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener()); 419 } catch (ReflectiveOperationException | SecurityException e) { 420 throw new RuntimeException(e); 421 } 422 completionSynchronizer = new WorkCompletionSynchronizer(); 423 started = true; 424 index(); 425 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 426 for (WorkQueueDescriptor descriptor : descriptors) { 427 initializeQueue(descriptor); 428 } 429 430 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { 431 @Override 432 public void beforeStop(ComponentManager mgr, boolean isStandby) { 433 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 434 for (WorkQueueDescriptor descriptor : descriptors) { 435 deactivateQueue(descriptor); 436 } 437 try { 438 if (!shutdown(10, TimeUnit.SECONDS)) { 439 log.error("Some processors are still active"); 440 } 441 } catch (InterruptedException e) { 442 Thread.currentThread().interrupt(); 443 throw new NuxeoException("Interrupted while stopping work manager thread pools", e); 444 } 445 } 446 447 @Override 448 public void afterStart(ComponentManager mgr, boolean isResume) { 449 if (isProcessingDisabled()) { 450 log.warn("WorkManager processing has been disabled on this node"); 451 return; 452 } 453 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 454 for (WorkQueueDescriptor descriptor : descriptors) { 455 activateQueue(descriptor); 456 } 457 } 458 459 @Override 460 public void afterStop(ComponentManager mgr, boolean isStandby) { 461 Framework.getRuntime().getComponentManager().removeListener(this); 462 } 463 }); 464 } 465 } 466 467 protected void index() { 468 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 469 descriptors.forEach(d -> d.categories.forEach(c -> { 470 categoryToQueueId.computeIfPresent(c, (k, v) -> { 471 if (!v.equals(d.getId())) { 472 log.error("Work category '{}' cannot be assigned to work queue '{}'" 473 + " because it is already assigned to work queue '{}'", c, d.getId(), v); 474 } 475 return v; 476 }); 477 categoryToQueueId.putIfAbsent(c, d.getId()); 478 })); 479 } 480 481 protected WorkThreadPoolExecutor getExecutor(String queueId) { 482 if (!started) { 483 if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) { 484 LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode"); 485 init(); 486 } else { 487 throw new IllegalStateException("Work manager not started, could not access to executors"); 488 } 489 } 490 WorkQueueDescriptor workQueueDescriptor; 491 synchronized (getRegistry()) { 492 workQueueDescriptor = getDescriptor(QUEUES_EP, queueId); 493 } 494 if (workQueueDescriptor == null) { 495 throw new IllegalArgumentException("No such work queue: " + queueId); 496 } 497 498 return executors.get(queueId); 499 } 500 501 @Override 502 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 503 WorkThreadPoolExecutor executor = getExecutor(queueId); 504 return shutdownExecutors(Collections.singleton(executor), timeout, unit); 505 } 506 507 protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) 508 throws InterruptedException { 509 // mark executors as shutting down 510 for (WorkThreadPoolExecutor executor : list) { 511 executor.shutdownAndSuspend(); 512 } 513 timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); 514 // wait until threads termination 515 for (WorkThreadPoolExecutor executor : list) { 516 long t0 = System.currentTimeMillis(); 517 if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { 518 return false; 519 } 520 timeout -= System.currentTimeMillis() - t0; 521 } 522 return true; 523 } 524 525 /** 526 * @deprecated since 10.2 because unused 527 */ 528 @Deprecated 529 protected long remainingMillis(long t0, long delay) { 530 long d = System.currentTimeMillis() - t0; 531 if (d > delay) { 532 return 0; 533 } 534 return delay - d; 535 } 536 537 /** 538 * @deprecated since 10.2 because unused 539 */ 540 @Deprecated 541 protected synchronized void removeExecutor(String queueId) { 542 executors.remove(queueId); 543 } 544 545 @Override 546 public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException { 547 shutdownInProgress = true; 548 try { 549 return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit); 550 } finally { 551 shutdownInProgress = false; 552 started = false; 553 } 554 } 555 556 /** 557 * A work instance and how to schedule it, for schedule-after-commit. 558 * 559 * @since 5.8 560 */ 561 public class WorkScheduling implements Synchronization { 562 public final Work work; 563 564 public final Scheduling scheduling; 565 566 public WorkScheduling(Work work, Scheduling scheduling) { 567 this.work = work; 568 this.scheduling = scheduling; 569 } 570 571 @Override 572 public void beforeCompletion() { 573 } 574 575 @Override 576 public void afterCompletion(int status) { 577 if (status == Status.STATUS_COMMITTED) { 578 schedule(work, scheduling, false); 579 } else if (status == Status.STATUS_ROLLEDBACK) { 580 work.setWorkInstanceState(State.UNKNOWN); 581 } else { 582 throw new IllegalArgumentException("Unsupported transaction status " + status); 583 } 584 } 585 } 586 587 /** 588 * Creates non-daemon threads at normal priority. 589 */ 590 private static class NamedThreadFactory implements ThreadFactory { 591 592 private final AtomicInteger threadNumber = new AtomicInteger(); 593 594 private final ThreadGroup group; 595 596 private final String prefix; 597 598 public NamedThreadFactory(String prefix) { 599 SecurityManager sm = System.getSecurityManager(); 600 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 601 this.prefix = prefix; 602 } 603 604 @Override 605 public Thread newThread(Runnable r) { 606 String name = prefix + threadNumber.incrementAndGet(); 607 Thread thread = new Thread(group, r, name); 608 // do not set daemon 609 thread.setPriority(Thread.NORM_PRIORITY); 610 thread.setUncaughtExceptionHandler(this::handleUncaughtException); 611 return thread; 612 } 613 614 protected void handleUncaughtException(Thread t, Throwable e) { 615 Log logLocal = LogFactory.getLog(WorkManagerImpl.class); 616 if (e instanceof RejectedExecutionException) { 617 // we are responsible of this exception, we use it during shutdown phase to not run the task taken just 618 // before shutdown due to race condition, so log it as WARN 619 logLocal.warn("Rejected execution error on thread " + t.getName(), e); 620 } else if (ExceptionUtils.hasInterruptedCause(e)) { 621 logLocal.warn("Interrupted error on thread" + t.getName(), e); 622 } else { 623 logLocal.error(String.format("Uncaught error on thread: %s, " 624 + "current work might be lost, WorkManager metrics might be corrupted.", t.getName()), e); 625 } 626 } 627 628 } 629 630 /** 631 * A {@link ThreadPoolExecutor} that keeps available the list of running tasks. 632 * <p> 633 * Completed tasks are passed to another queue. 634 * <p> 635 * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory, 636 * persisted, etc). 637 * 638 * @since 5.6 639 */ 640 protected class WorkThreadPoolExecutor extends ThreadPoolExecutor { 641 642 protected final String queueId; 643 644 /** 645 * List of running Work instances, in order to be able to interrupt them if requested. 646 */ 647 // @GuardedBy("itself") 648 protected final ConcurrentLinkedQueue<Work> running; 649 650 // metrics, in cluster mode these counters must be aggregated, no logic should rely on them 651 // Number of work scheduled by this instance 652 protected final Counter scheduledCount; 653 654 // Number of work currently running on this instance 655 protected final Counter runningCount; 656 657 // Number of work completed by this instance 658 protected final Counter completedCount; 659 660 protected final Timer workTimer; 661 662 protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 663 NuxeoBlockingQueue queue, ThreadFactory threadFactory) { 664 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory); 665 queueId = queue.queueId; 666 running = new ConcurrentLinkedQueue<>(); 667 // init metrics 668 scheduledCount = registry.counter( 669 MetricName.build("nuxeo.works.queue.scheduled").tagged("queue", queueId)); 670 runningCount = registry.counter( 671 MetricName.build("nuxeo.works.queue.running").tagged("queue", queueId)); 672 completedCount = registry.counter( 673 MetricName.build("nuxeo.works.queue.completed").tagged("queue", queueId)); 674 workTimer = registry.timer(MetricName.build("nuxeo.works.queue.timer").tagged("queue", queueId)); 675 } 676 677 public int getScheduledOrRunningSize() { 678 int ret = 0; 679 for (String queueId : getWorkQueueIds()) { 680 ret += getQueueSize(queueId, null); 681 } 682 return ret; 683 } 684 685 @Override 686 public void execute(Runnable r) { 687 throw new UnsupportedOperationException("use other api"); 688 } 689 690 /** 691 * Executes the given task sometime in the future. 692 * 693 * @param work the work to execute 694 * @see #execute(Runnable) 695 * @deprecated since 10.2 because unused 696 */ 697 @Deprecated 698 public void execute(Work work) { 699 scheduledCount.inc(); 700 submit(work); 701 } 702 703 /** 704 * go through the queue instead of using super.execute which may skip the queue and hand off to a thread 705 * directly 706 */ 707 protected void submit(Work work) throws RuntimeException { 708 queuing.workSchedule(queueId, work); 709 } 710 711 @Override 712 protected void beforeExecute(Thread t, Runnable r) { 713 Work work = WorkHolder.getWork(r); 714 if (isShutdown()) { 715 work.setWorkInstanceState(State.SCHEDULED); 716 queuing.workReschedule(queueId, work); 717 throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work); 718 } 719 work.setWorkInstanceState(State.RUNNING); 720 queuing.workRunning(queueId, work); 721 running.add(work); 722 runningCount.inc(); 723 } 724 725 @Override 726 protected void afterExecute(Runnable r, Throwable t) { 727 Work work = WorkHolder.getWork(r); 728 try { 729 if (work.isSuspending()) { 730 log.trace("{} is suspending, giving up", work); 731 return; 732 } 733 if (isShutdown()) { 734 log.trace("rescheduling {}", work.getId(), t); 735 work.setWorkInstanceState(State.SCHEDULED); 736 queuing.workReschedule(queueId, work); 737 return; 738 } 739 work.setWorkInstanceState(State.UNKNOWN); 740 queuing.workCompleted(queueId, work); 741 } finally { 742 running.remove(work); 743 runningCount.dec(); 744 completedCount.inc(); 745 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 746 completionSynchronizer.signalCompletedWork(); 747 } 748 } 749 750 /** 751 * Initiates a shutdown of this executor and asks for work instances to suspend themselves. 752 */ 753 public void shutdownAndSuspend() throws InterruptedException { 754 try { 755 // don't consume the queue anymore 756 deactivateQueueMetrics(queueId); 757 queuing.setActive(queueId, false); 758 // suspend all running work 759 boolean hasRunningWork = false; 760 for (Work work : running) { 761 work.setWorkInstanceSuspending(); 762 log.trace("suspending and rescheduling {}", work.getId()); 763 work.setWorkInstanceState(State.SCHEDULED); 764 queuing.workReschedule(queueId, work); 765 hasRunningWork = true; 766 } 767 if (hasRunningWork) { 768 long shutdownDelay = Framework.getService(ConfigurationService.class) 769 .getLong(SHUTDOWN_DELAY_MS_KEY, 0); 770 // sleep for a given amount of time for works to have time to persist their state and stop properly 771 Thread.sleep(shutdownDelay); 772 } 773 shutdownNow(); 774 } finally { 775 executors.remove(queueId); 776 } 777 } 778 779 public void removeScheduled(String workId) { 780 queuing.removeScheduled(queueId, workId); 781 } 782 783 } 784 785 @Override 786 public void schedule(Work work) { 787 schedule(work, Scheduling.ENQUEUE, false); 788 } 789 790 @Override 791 public void schedule(Work work, boolean afterCommit) { 792 schedule(work, Scheduling.ENQUEUE, afterCommit); 793 } 794 795 @Override 796 public void schedule(Work work, Scheduling scheduling) { 797 schedule(work, scheduling, false); 798 } 799 800 @Override 801 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 802 String workId = work.getId(); 803 String queueId = getCategoryQueueId(work.getCategory()); 804 if (!isQueuingEnabled(queueId)) { 805 return; 806 } 807 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 808 return; 809 } 810 work.setWorkInstanceState(State.SCHEDULED); 811 WorkSchedulePath.newInstance(work); 812 switch (scheduling) { 813 case ENQUEUE: 814 break; 815 case CANCEL_SCHEDULED: 816 getExecutor(queueId).removeScheduled(workId); 817 WorkStateHelper.setCanceled(work.getId()); 818 break; 819 case IF_NOT_SCHEDULED: 820 case IF_NOT_RUNNING_OR_SCHEDULED: 821 // TODO disabled for now because hasWorkInState uses isScheduled 822 // which is buggy 823 boolean disabled = Boolean.TRUE.booleanValue(); 824 if (!disabled && hasWorkInState(workId, scheduling.state)) { 825 log.debug("Canceling schedule because found: {}", scheduling); 826 return; 827 828 } 829 break; 830 831 } 832 if (work.isGroupJoin()) { 833 log.debug("Submit Work: {} to GroupJoin: {}", work::getId, work::getPartitionKey); 834 WorkStateHelper.addGroupJoinWork(work.getPartitionKey()); 835 } 836 queuing.workSchedule(queueId, work); 837 } 838 839 /** 840 * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager). 841 * 842 * @since 5.8 843 */ 844 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 845 TransactionManager transactionManager; 846 try { 847 transactionManager = TransactionHelper.lookupTransactionManager(); 848 } catch (NamingException e) { 849 transactionManager = null; 850 } 851 if (transactionManager == null) { 852 log.debug("Not scheduling work after commit because of missing transaction manager: {}", work); 853 return false; 854 } 855 try { 856 Transaction transaction = transactionManager.getTransaction(); 857 if (transaction == null) { 858 log.debug("Not scheduling work after commit because of missing transaction: {}", work); 859 return false; 860 } 861 int status = transaction.getStatus(); 862 if (status == Status.STATUS_ACTIVE) { 863 log.debug("Scheduling work after commit: {}", work); 864 transaction.registerSynchronization(new WorkScheduling(work, scheduling)); 865 return true; 866 } else if (status == Status.STATUS_COMMITTED) { 867 // called in afterCompletion, we can schedule immediately 868 log.debug("Scheduling work immediately: {}", work); 869 return false; 870 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 871 log.debug("Cancelling schedule because transaction marked rollback-only: {}", work); 872 return true; 873 } else { 874 log.debug("Not scheduling work after commit because transaction is in status {}: {}", status, work); 875 return false; 876 } 877 } catch (SystemException | RollbackException e) { 878 log.error("Cannot schedule after commit", e); 879 return false; 880 } 881 } 882 883 @Override 884 public Work find(String workId, State state) { 885 return queuing.find(workId, state); 886 } 887 888 /** 889 * @param state SCHEDULED, RUNNING or null for both 890 */ 891 protected boolean hasWorkInState(String workId, State state) { 892 return queuing.isWorkInState(workId, state); 893 } 894 895 @Override 896 public State getWorkState(String workId) { 897 return queuing.getWorkState(workId); 898 } 899 900 @Override 901 public List<Work> listWork(String queueId, State state) { 902 // don't return scheduled after commit 903 return queuing.listWork(queueId, state); 904 } 905 906 @Override 907 public List<String> listWorkIds(String queueId, State state) { 908 return queuing.listWorkIds(queueId, state); 909 } 910 911 @Override 912 public WorkQueueMetrics getMetrics(String queueId) { 913 return queuing.metrics(queueId); 914 } 915 916 @Override 917 public int getQueueSize(String queueId, State state) { 918 WorkQueueMetrics metrics = getMetrics(queueId); 919 if (state == null) { 920 return metrics.scheduled.intValue() + metrics.running.intValue(); 921 } 922 if (state == State.SCHEDULED) { 923 return metrics.scheduled.intValue(); 924 } else if (state == State.RUNNING) { 925 return metrics.running.intValue(); 926 } else { 927 throw new IllegalArgumentException(String.valueOf(state)); 928 } 929 } 930 931 @Override 932 public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException { 933 return awaitCompletion(null, duration, unit); 934 } 935 936 @Override 937 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 938 if (!isStarted()) { 939 return true; 940 } 941 Tracer tracer = Tracing.getTracer(); 942 try (Scope scope = tracer.spanBuilder("workmanager.awaitCompletion").startScopedSpan()) { 943 long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit); 944 long deadline = getTimestampAfter(durationInMs); 945 int pause = (int) Math.min(durationInMs, 500L); 946 log.debug("awaitForCompletion {} ms", durationInMs); 947 do { 948 if (noScheduledOrRunningWork(queueId)) { 949 completionSynchronizer.signalCompletedWork(); 950 tracer.getCurrentSpan().setStatus(io.opencensus.trace.Status.OK); 951 return true; 952 } 953 completionSynchronizer.waitForCompletedWork(pause); 954 } while (System.currentTimeMillis() < deadline); 955 log.info("awaitCompletion timeout after {} ms", durationInMs); 956 tracer.getCurrentSpan().setStatus(io.opencensus.trace.Status.DEADLINE_EXCEEDED); 957 return false; 958 } 959 } 960 961 protected long getTimestampAfter(long durationInMs) { 962 long ret = System.currentTimeMillis() + durationInMs; 963 if (ret < 0) { 964 ret = Long.MAX_VALUE; 965 } 966 return ret; 967 } 968 969 protected boolean noScheduledOrRunningWork(String queueId) { 970 if (queueId == null) { 971 for (String id : getWorkQueueIds()) { 972 if (!noScheduledOrRunningWork(id)) { 973 return false; 974 } 975 } 976 return true; 977 } 978 if (!isProcessingEnabled(queueId)) { 979 return getExecutor(queueId).runningCount.getCount() == 0L; 980 } 981 if (getQueueSize(queueId, null) > 0) { 982 log.trace("{} not empty, sched: {}, running: {}", () -> queueId, 983 () -> getQueueSize(queueId, State.SCHEDULED), () -> getQueueSize(queueId, State.RUNNING)); 984 return false; 985 } 986 log.trace("{} is completed", queueId); 987 return true; 988 } 989 990 @Override 991 public boolean isStarted() { 992 return started && !shutdownInProgress; 993 } 994 995 @Override 996 public boolean supportsProcessingDisabling() { 997 return queuing.supportsProcessingDisabling(); 998 } 999 1000}