001/*
002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import static org.nuxeo.ecm.core.work.api.WorkQueueDescriptor.ALL_QUEUES;
023
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.RejectedExecutionException;
032import java.util.concurrent.ThreadFactory;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.locks.Condition;
037import java.util.concurrent.locks.ReentrantLock;
038import java.util.stream.Collectors;
039
040import javax.naming.NamingException;
041import javax.transaction.RollbackException;
042import javax.transaction.Status;
043import javax.transaction.Synchronization;
044import javax.transaction.SystemException;
045import javax.transaction.Transaction;
046import javax.transaction.TransactionManager;
047
048import org.apache.commons.logging.Log;
049import org.apache.commons.logging.LogFactory;
050import org.apache.logging.log4j.LogManager;
051import org.apache.logging.log4j.Logger;
052import org.nuxeo.common.utils.ExceptionUtils;
053import org.nuxeo.ecm.core.api.NuxeoException;
054import org.nuxeo.ecm.core.event.EventServiceComponent;
055import org.nuxeo.ecm.core.work.WorkQueuing.Listener;
056import org.nuxeo.ecm.core.work.api.Work;
057import org.nuxeo.ecm.core.work.api.Work.State;
058import org.nuxeo.ecm.core.work.api.WorkManager;
059import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
060import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
061import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor;
062import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
063import org.nuxeo.lib.stream.codec.AvroMessageCodec;
064import org.nuxeo.lib.stream.codec.Codec;
065import org.nuxeo.lib.stream.computation.Record;
066import org.nuxeo.lib.stream.log.Name;
067import org.nuxeo.runtime.api.Framework;
068import org.nuxeo.runtime.metrics.MetricsService;
069import org.nuxeo.runtime.metrics.NuxeoMetricSet;
070import org.nuxeo.runtime.model.ComponentContext;
071import org.nuxeo.runtime.model.ComponentInstance;
072import org.nuxeo.runtime.model.ComponentManager;
073import org.nuxeo.runtime.model.DefaultComponent;
074import org.nuxeo.runtime.model.Descriptor;
075import org.nuxeo.runtime.services.config.ConfigurationService;
076import org.nuxeo.runtime.stream.StreamService;
077import org.nuxeo.runtime.transaction.TransactionHelper;
078
079import io.dropwizard.metrics5.Counter;
080import io.dropwizard.metrics5.MetricName;
081import io.dropwizard.metrics5.MetricRegistry;
082import io.dropwizard.metrics5.SharedMetricRegistries;
083import io.dropwizard.metrics5.Timer;
084
085import io.opencensus.common.Scope;
086import io.opencensus.trace.Tracer;
087import io.opencensus.trace.Tracing;
088
089/**
090 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
091 * implementation.
092 *
093 * @since 5.6
094 */
095public class WorkManagerImpl extends DefaultComponent implements WorkManager {
096
097    private static final Logger log = LogManager.getLogger(WorkManagerImpl.class);
098
099    public static final String NAME = "org.nuxeo.ecm.core.work.service";
100
101    protected static final String QUEUES_EP = "queues";
102
103    protected static final String IMPL_EP = "implementation";
104
105    public static final String DEFAULT_QUEUE_ID = "default";
106
107    public static final String DEFAULT_CATEGORY = "default";
108
109    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
110
111    /**
112     * @since 10.2
113     */
114    public static final String SHUTDOWN_DELAY_MS_KEY = "nuxeo.work.shutdown.delay.ms";
115
116    /**
117     * @since 11.1
118     * @deprecated Use {@link #WORKMANAGER_PROCESSING_ENABLED} instead
119     */
120    @Deprecated(since = "11.2")
121    public static final String WORKMANAGER_PROCESSING_DISABLE = "nuxeo.work.processing.disable";
122
123    /**
124     * @since 11.2
125     */
126    public static final String WORKMANAGER_PROCESSING_ENABLED = "nuxeo.work.processing.enabled";
127
128    /**
129     * The dead letter queue stream name.
130     *
131     * @since 11.1
132     */
133    public static final Name DEAD_LETTER_QUEUE = Name.ofUrn("work/dlq");
134
135    // @since 11.1
136    public static final Codec<Record> DEAD_LETTER_QUEUE_CODEC = new AvroMessageCodec<>(Record.class);
137
138    // @since 11.1
139    protected static final String GLOBAL_METRIC_PREFIX = "nuxeo.works.global.queue";
140
141    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
142
143    // used synchronized
144    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
145
146    protected final Map<String, String> categoryToQueueId = new HashMap<>();
147
148    protected WorkQueuing queuing;
149
150    protected boolean active = true;
151
152    public WorkManagerImpl() {
153        // make subclasses (StreamWorkManager) use the same registry
154        super.setName(NAME);
155    }
156
157    @Override
158    public void setName(String name) {
159        // do nothing: name is hardcoded and does not depend on XML configuration
160    }
161
162    /**
163     * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another
164     * node in cluster mode.
165     */
166    protected static class WorkCompletionSynchronizer {
167        protected final ReentrantLock lock = new ReentrantLock();
168
169        protected final Condition condition = lock.newCondition();
170
171        protected boolean waitForCompletedWork(long timeMs) throws InterruptedException {
172            lock.lock();
173            try {
174                return condition.await(timeMs, TimeUnit.MILLISECONDS); // caller is looping
175            } finally {
176                lock.unlock();
177            }
178        }
179
180        protected void signalCompletedWork() {
181            lock.lock();
182            try {
183                condition.signalAll();
184            } finally {
185                lock.unlock();
186            }
187        }
188
189    }
190
191    protected WorkCompletionSynchronizer completionSynchronizer;
192
193    @Override
194    public void registerContribution(Object contribution, String xp, ComponentInstance component) {
195        if (QUEUES_EP.equals(xp)) {
196            WorkQueueDescriptor descriptor = (WorkQueueDescriptor) contribution;
197            if (ALL_QUEUES.equals(descriptor.getId())) {
198                Boolean processing = descriptor.processing;
199                if (processing == null) {
200                    log.error("Ignoring work queue descriptor {} with no processing/queuing", ALL_QUEUES);
201                    return;
202                }
203                log.info("Setting on all work queues:{}",
204                        () -> " processing=" + processing + (queuing == null ? "" : " queuing=" + queuing));
205                // activate/deactivate processing on all queues
206                getDescriptors(QUEUES_EP).forEach(d -> {
207                    WorkQueueDescriptor wqd = new WorkQueueDescriptor();
208                    wqd.id = d.getId();
209                    wqd.processing = processing;
210                    register(QUEUES_EP, wqd);
211                });
212            } else {
213                register(QUEUES_EP, descriptor);
214            }
215        } else {
216            super.registerContribution(contribution, xp, component);
217        }
218    }
219
220    void initializeQueue(WorkQueueDescriptor config) {
221        if (ALL_QUEUES.equals(config.id)) {
222            throw new IllegalArgumentException("cannot initialize all queues");
223        }
224        if (queuing.getQueue(config.id) != null) {
225            throw new IllegalStateException("work queue " + config.id + " is already initialized");
226        }
227        if (executors.containsKey(config.id)) {
228            throw new IllegalStateException("work queue " + config.id + " already have an executor");
229        }
230        NuxeoBlockingQueue queue = queuing.init(config);
231        ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-");
232        int maxPoolSize = config.getMaxThreads();
233        WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS,
234                queue, threadFactory);
235        // prestart all core threads so that direct additions to the queue
236        // (from another Nuxeo instance) can be seen
237        executor.prestartAllCoreThreads();
238        executors.put(config.id, executor);
239        log.info("Initialized work queue {}, {}", config.id, config);
240    }
241
242    void activateQueue(WorkQueueDescriptor config) {
243        if (ALL_QUEUES.equals(config.id)) {
244            throw new IllegalArgumentException("cannot activate all queues");
245        }
246        queuing.setActive(config.id, config.isProcessingEnabled());
247        log.info("Activated work queue {}, {}", config.id, config);
248        // Enable metrics
249        if (config.isProcessingEnabled()) {
250            activateQueueMetrics(config.id);
251        }
252    }
253
254    void deactivateQueue(WorkQueueDescriptor config) {
255        if (ALL_QUEUES.equals(config.id)) {
256            throw new IllegalArgumentException("cannot deactivate all queues");
257        }
258        // Disable metrics
259        if (config.isProcessingEnabled()) {
260            deactivateQueueMetrics(config.id);
261        }
262        queuing.setActive(config.id, false);
263        log.info("Deactivated work queue {}", config.id);
264    }
265
266    void activateQueueMetrics(String queueId) {
267        NuxeoMetricSet queueMetrics = new NuxeoMetricSet(
268                MetricName.build(GLOBAL_METRIC_PREFIX).tagged("queue", queueId));
269        queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled");
270        queueMetrics.putGauge(() -> getMetrics(queueId).running, "running");
271        queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed");
272        queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled");
273        registry.registerAll(queueMetrics);
274    }
275
276    void deactivateQueueMetrics(String queueId) {
277        String queueMetricsName = MetricName.build(GLOBAL_METRIC_PREFIX).tagged("queue", queueId).getKey();
278        registry.removeMatching((name, metric) -> name.getKey().startsWith(queueMetricsName));
279    }
280
281    @Override
282    public boolean isQueuingEnabled(String queueId) {
283        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
284        return wqd != null && wqd.isQueuingEnabled();
285    }
286
287    @Override
288    public void enableProcessing(boolean value) {
289        getDescriptors(QUEUES_EP).forEach(d -> enableProcessing(d.getId(), value));
290    }
291
292    @Override
293    public void enableProcessing(String queueId, boolean value) {
294        WorkQueueDescriptor config = getDescriptor(QUEUES_EP, queueId);
295        if (config == null) {
296            throw new IllegalArgumentException("no such queue " + queueId);
297        }
298        if (!value) {
299            if (!queuing.supportsProcessingDisabling()) {
300                log.error("Attempting to disable works processing on a WorkQueuing instance that does not support it. "
301                        + "Works will still be processed. "
302                        + "Disabling works processing to manage distribution finely can be done using Redis or Stream implementations.");
303            }
304            deactivateQueue(config);
305        } else {
306            activateQueue(config);
307        }
308    }
309
310    @Override
311    public boolean isProcessingEnabled() {
312        if (isProcessingDisabled()) {
313            return false;
314        }
315        for (Descriptor d : getDescriptors(QUEUES_EP)) {
316            if (queuing.getQueue(d.getId()).active) {
317                return true;
318            }
319        }
320        return false;
321    }
322
323    protected boolean isProcessingDisabled() {
324        if (Boolean.parseBoolean(Framework.getProperty(WORKMANAGER_PROCESSING_DISABLE, "false"))) {
325            log.warn("nuxeo.work.processing.disable=true is now deprecated, use nuxeo.work.processing.enabled=false instead");
326            return true;
327        }
328        if (Framework.isBooleanPropertyFalse(WORKMANAGER_PROCESSING_ENABLED)) {
329            return true;
330        }
331        return false;
332    }
333
334    @Override
335    public boolean isProcessingEnabled(String queueId) {
336        if (isProcessingDisabled()) {
337            return false;
338        }
339        if (queueId == null) {
340            return isProcessingEnabled();
341        }
342        return queuing.getQueue(queueId).active;
343    }
344
345    // ----- WorkManager -----
346
347    @Override
348    public List<String> getWorkQueueIds() {
349        synchronized (getRegistry()) {
350            return getDescriptors(QUEUES_EP).stream().map(Descriptor::getId).collect(Collectors.toList());
351        }
352    }
353
354    @Override
355    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
356        synchronized (getRegistry()) {
357            return getDescriptor(QUEUES_EP, queueId);
358        }
359    }
360
361    @Override
362    public String getCategoryQueueId(String category) {
363        if (category == null) {
364            category = DEFAULT_CATEGORY;
365        }
366        String queueId = categoryToQueueId.get(category);
367        if (queueId == null) {
368            queueId = DEFAULT_QUEUE_ID;
369        }
370        return queueId;
371    }
372
373    @Override
374    public int getApplicationStartedOrder() {
375        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
376    }
377
378    @Override
379    public void start(ComponentContext context) {
380        super.start(context);
381        initDeadLetterQueueStream();
382        init();
383    }
384
385    protected void initDeadLetterQueueStream() {
386        StreamService service = Framework.getService(StreamService.class);
387        if (service == null) {
388            return;
389        }
390        try {
391            org.nuxeo.lib.stream.log.LogManager logManager = service.getLogManager();
392            if (!logManager.exists(DEAD_LETTER_QUEUE)) {
393                log.info("Initializing dead letter queue to store Work in failure");
394                logManager.createIfNotExists(DEAD_LETTER_QUEUE, 1);
395            }
396            // Needed to initialize the appender with the proper codec
397            logManager.getAppender(DEAD_LETTER_QUEUE, DEAD_LETTER_QUEUE_CODEC);
398        } catch (IllegalArgumentException e) {
399            log.info("No default LogManager found, there will be no dead letter queuing for Work in failure.");
400        }
401    }
402
403    protected volatile boolean started = false;
404
405    protected volatile boolean shutdownInProgress = false;
406
407    @Override
408    public void init() {
409        if (started || !active) {
410            return;
411        }
412        synchronized (this) {
413            if (started) {
414                return;
415            }
416            WorkQueuingDescriptor d = getDescriptor(IMPL_EP, Descriptor.UNIQUE_DESCRIPTOR_ID);
417            try {
418                queuing = d.klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener());
419            } catch (ReflectiveOperationException | SecurityException e) {
420                throw new RuntimeException(e);
421            }
422            completionSynchronizer = new WorkCompletionSynchronizer();
423            started = true;
424            index();
425            List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
426            for (WorkQueueDescriptor descriptor : descriptors) {
427                initializeQueue(descriptor);
428            }
429
430            Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() {
431                @Override
432                public void beforeStop(ComponentManager mgr, boolean isStandby) {
433                    List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
434                    for (WorkQueueDescriptor descriptor : descriptors) {
435                        deactivateQueue(descriptor);
436                    }
437                    try {
438                        if (!shutdown(10, TimeUnit.SECONDS)) {
439                            log.error("Some processors are still active");
440                        }
441                    } catch (InterruptedException e) {
442                        Thread.currentThread().interrupt();
443                        throw new NuxeoException("Interrupted while stopping work manager thread pools", e);
444                    }
445                }
446
447                @Override
448                public void afterStart(ComponentManager mgr, boolean isResume) {
449                    if (isProcessingDisabled()) {
450                        log.warn("WorkManager processing has been disabled on this node");
451                        return;
452                    }
453                    List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
454                    for (WorkQueueDescriptor descriptor : descriptors) {
455                        activateQueue(descriptor);
456                    }
457                }
458
459                @Override
460                public void afterStop(ComponentManager mgr, boolean isStandby) {
461                    Framework.getRuntime().getComponentManager().removeListener(this);
462                }
463            });
464        }
465    }
466
467    protected void index() {
468        List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
469        descriptors.forEach(d -> d.categories.forEach(c -> {
470            categoryToQueueId.computeIfPresent(c, (k, v) -> {
471                if (!v.equals(d.getId())) {
472                    log.error("Work category '{}' cannot be assigned to work queue '{}'"
473                            + " because it is already assigned to work queue '{}'", c, d.getId(), v);
474                }
475                return v;
476            });
477            categoryToQueueId.putIfAbsent(c, d.getId());
478        }));
479    }
480
481    protected WorkThreadPoolExecutor getExecutor(String queueId) {
482        if (!started) {
483            if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) {
484                LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode");
485                init();
486            } else {
487                throw new IllegalStateException("Work manager not started, could not access to executors");
488            }
489        }
490        WorkQueueDescriptor workQueueDescriptor;
491        synchronized (getRegistry()) {
492            workQueueDescriptor = getDescriptor(QUEUES_EP, queueId);
493        }
494        if (workQueueDescriptor == null) {
495            throw new IllegalArgumentException("No such work queue: " + queueId);
496        }
497
498        return executors.get(queueId);
499    }
500
501    @Override
502    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
503        WorkThreadPoolExecutor executor = getExecutor(queueId);
504        return shutdownExecutors(Collections.singleton(executor), timeout, unit);
505    }
506
507    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
508            throws InterruptedException {
509        // mark executors as shutting down
510        for (WorkThreadPoolExecutor executor : list) {
511            executor.shutdownAndSuspend();
512        }
513        timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
514        // wait until threads termination
515        for (WorkThreadPoolExecutor executor : list) {
516            long t0 = System.currentTimeMillis();
517            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
518                return false;
519            }
520            timeout -= System.currentTimeMillis() - t0;
521        }
522        return true;
523    }
524
525    /**
526     * @deprecated since 10.2 because unused
527     */
528    @Deprecated
529    protected long remainingMillis(long t0, long delay) {
530        long d = System.currentTimeMillis() - t0;
531        if (d > delay) {
532            return 0;
533        }
534        return delay - d;
535    }
536
537    /**
538     * @deprecated since 10.2 because unused
539     */
540    @Deprecated
541    protected synchronized void removeExecutor(String queueId) {
542        executors.remove(queueId);
543    }
544
545    @Override
546    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
547        shutdownInProgress = true;
548        try {
549            return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit);
550        } finally {
551            shutdownInProgress = false;
552            started = false;
553        }
554    }
555
556    /**
557     * A work instance and how to schedule it, for schedule-after-commit.
558     *
559     * @since 5.8
560     */
561    public class WorkScheduling implements Synchronization {
562        public final Work work;
563
564        public final Scheduling scheduling;
565
566        public WorkScheduling(Work work, Scheduling scheduling) {
567            this.work = work;
568            this.scheduling = scheduling;
569        }
570
571        @Override
572        public void beforeCompletion() {
573        }
574
575        @Override
576        public void afterCompletion(int status) {
577            if (status == Status.STATUS_COMMITTED) {
578                schedule(work, scheduling, false);
579            } else if (status == Status.STATUS_ROLLEDBACK) {
580                work.setWorkInstanceState(State.UNKNOWN);
581            } else {
582                throw new IllegalArgumentException("Unsupported transaction status " + status);
583            }
584        }
585    }
586
587    /**
588     * Creates non-daemon threads at normal priority.
589     */
590    private static class NamedThreadFactory implements ThreadFactory {
591
592        private final AtomicInteger threadNumber = new AtomicInteger();
593
594        private final ThreadGroup group;
595
596        private final String prefix;
597
598        public NamedThreadFactory(String prefix) {
599            SecurityManager sm = System.getSecurityManager();
600            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
601            this.prefix = prefix;
602        }
603
604        @Override
605        public Thread newThread(Runnable r) {
606            String name = prefix + threadNumber.incrementAndGet();
607            Thread thread = new Thread(group, r, name);
608            // do not set daemon
609            thread.setPriority(Thread.NORM_PRIORITY);
610            thread.setUncaughtExceptionHandler(this::handleUncaughtException);
611            return thread;
612        }
613
614        protected void handleUncaughtException(Thread t, Throwable e) {
615            Log logLocal = LogFactory.getLog(WorkManagerImpl.class);
616            if (e instanceof RejectedExecutionException) {
617                // we are responsible of this exception, we use it during shutdown phase to not run the task taken just
618                // before shutdown due to race condition, so log it as WARN
619                logLocal.warn("Rejected execution error on thread " + t.getName(), e);
620            } else if (ExceptionUtils.hasInterruptedCause(e)) {
621                logLocal.warn("Interrupted error on thread" + t.getName(), e);
622            } else {
623                logLocal.error(String.format("Uncaught error on thread: %s, "
624                        + "current work might be lost, WorkManager metrics might be corrupted.", t.getName()), e);
625            }
626        }
627
628    }
629
630    /**
631     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
632     * <p>
633     * Completed tasks are passed to another queue.
634     * <p>
635     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
636     * persisted, etc).
637     *
638     * @since 5.6
639     */
640    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
641
642        protected final String queueId;
643
644        /**
645         * List of running Work instances, in order to be able to interrupt them if requested.
646         */
647        // @GuardedBy("itself")
648        protected final ConcurrentLinkedQueue<Work> running;
649
650        // metrics, in cluster mode these counters must be aggregated, no logic should rely on them
651        // Number of work scheduled by this instance
652        protected final Counter scheduledCount;
653
654        // Number of work currently running on this instance
655        protected final Counter runningCount;
656
657        // Number of work completed by this instance
658        protected final Counter completedCount;
659
660        protected final Timer workTimer;
661
662        protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
663                NuxeoBlockingQueue queue, ThreadFactory threadFactory) {
664            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory);
665            queueId = queue.queueId;
666            running = new ConcurrentLinkedQueue<>();
667            // init metrics
668            scheduledCount = registry.counter(
669                    MetricName.build("nuxeo.works.queue.scheduled").tagged("queue", queueId));
670            runningCount = registry.counter(
671                    MetricName.build("nuxeo.works.queue.running").tagged("queue", queueId));
672            completedCount = registry.counter(
673                    MetricName.build("nuxeo.works.queue.completed").tagged("queue", queueId));
674            workTimer = registry.timer(MetricName.build("nuxeo.works.queue.timer").tagged("queue", queueId));
675        }
676
677        public int getScheduledOrRunningSize() {
678            int ret = 0;
679            for (String queueId : getWorkQueueIds()) {
680                ret += getQueueSize(queueId, null);
681            }
682            return ret;
683        }
684
685        @Override
686        public void execute(Runnable r) {
687            throw new UnsupportedOperationException("use other api");
688        }
689
690        /**
691         * Executes the given task sometime in the future.
692         *
693         * @param work the work to execute
694         * @see #execute(Runnable)
695         * @deprecated since 10.2 because unused
696         */
697        @Deprecated
698        public void execute(Work work) {
699            scheduledCount.inc();
700            submit(work);
701        }
702
703        /**
704         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
705         * directly
706         */
707        protected void submit(Work work) throws RuntimeException {
708            queuing.workSchedule(queueId, work);
709        }
710
711        @Override
712        protected void beforeExecute(Thread t, Runnable r) {
713            Work work = WorkHolder.getWork(r);
714            if (isShutdown()) {
715                work.setWorkInstanceState(State.SCHEDULED);
716                queuing.workReschedule(queueId, work);
717                throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work);
718            }
719            work.setWorkInstanceState(State.RUNNING);
720            queuing.workRunning(queueId, work);
721            running.add(work);
722            runningCount.inc();
723        }
724
725        @Override
726        protected void afterExecute(Runnable r, Throwable t) {
727            Work work = WorkHolder.getWork(r);
728            try {
729                if (work.isSuspending()) {
730                    log.trace("{} is suspending, giving up", work);
731                    return;
732                }
733                if (isShutdown()) {
734                    log.trace("rescheduling {}", work.getId(), t);
735                    work.setWorkInstanceState(State.SCHEDULED);
736                    queuing.workReschedule(queueId, work);
737                    return;
738                }
739                work.setWorkInstanceState(State.UNKNOWN);
740                queuing.workCompleted(queueId, work);
741            } finally {
742                running.remove(work);
743                runningCount.dec();
744                completedCount.inc();
745                workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
746                completionSynchronizer.signalCompletedWork();
747            }
748        }
749
750        /**
751         * Initiates a shutdown of this executor and asks for work instances to suspend themselves.
752         */
753        public void shutdownAndSuspend() throws InterruptedException {
754            try {
755                // don't consume the queue anymore
756                deactivateQueueMetrics(queueId);
757                queuing.setActive(queueId, false);
758                // suspend all running work
759                boolean hasRunningWork = false;
760                for (Work work : running) {
761                    work.setWorkInstanceSuspending();
762                    log.trace("suspending and rescheduling {}", work.getId());
763                    work.setWorkInstanceState(State.SCHEDULED);
764                    queuing.workReschedule(queueId, work);
765                    hasRunningWork = true;
766                }
767                if (hasRunningWork) {
768                    long shutdownDelay = Framework.getService(ConfigurationService.class)
769                                                  .getLong(SHUTDOWN_DELAY_MS_KEY, 0);
770                    // sleep for a given amount of time for works to have time to persist their state and stop properly
771                    Thread.sleep(shutdownDelay);
772                }
773                shutdownNow();
774            } finally {
775                executors.remove(queueId);
776            }
777        }
778
779        public void removeScheduled(String workId) {
780            queuing.removeScheduled(queueId, workId);
781        }
782
783    }
784
785    @Override
786    public void schedule(Work work) {
787        schedule(work, Scheduling.ENQUEUE, false);
788    }
789
790    @Override
791    public void schedule(Work work, boolean afterCommit) {
792        schedule(work, Scheduling.ENQUEUE, afterCommit);
793    }
794
795    @Override
796    public void schedule(Work work, Scheduling scheduling) {
797        schedule(work, scheduling, false);
798    }
799
800    @Override
801    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
802        String workId = work.getId();
803        String queueId = getCategoryQueueId(work.getCategory());
804        if (!isQueuingEnabled(queueId)) {
805            return;
806        }
807        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
808            return;
809        }
810        work.setWorkInstanceState(State.SCHEDULED);
811        WorkSchedulePath.newInstance(work);
812        switch (scheduling) {
813        case ENQUEUE:
814            break;
815        case CANCEL_SCHEDULED:
816            getExecutor(queueId).removeScheduled(workId);
817            WorkStateHelper.setCanceled(work.getId());
818            break;
819        case IF_NOT_SCHEDULED:
820        case IF_NOT_RUNNING_OR_SCHEDULED:
821            // TODO disabled for now because hasWorkInState uses isScheduled
822            // which is buggy
823            boolean disabled = Boolean.TRUE.booleanValue();
824            if (!disabled && hasWorkInState(workId, scheduling.state)) {
825                log.debug("Canceling schedule because found: {}", scheduling);
826                return;
827
828            }
829            break;
830
831        }
832        if (work.isGroupJoin()) {
833            log.debug("Submit Work: {} to GroupJoin: {}", work::getId, work::getPartitionKey);
834            WorkStateHelper.addGroupJoinWork(work.getPartitionKey());
835        }
836        queuing.workSchedule(queueId, work);
837    }
838
839    /**
840     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
841     *
842     * @since 5.8
843     */
844    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
845        TransactionManager transactionManager;
846        try {
847            transactionManager = TransactionHelper.lookupTransactionManager();
848        } catch (NamingException e) {
849            transactionManager = null;
850        }
851        if (transactionManager == null) {
852            log.debug("Not scheduling work after commit because of missing transaction manager: {}", work);
853            return false;
854        }
855        try {
856            Transaction transaction = transactionManager.getTransaction();
857            if (transaction == null) {
858                log.debug("Not scheduling work after commit because of missing transaction: {}", work);
859                return false;
860            }
861            int status = transaction.getStatus();
862            if (status == Status.STATUS_ACTIVE) {
863                log.debug("Scheduling work after commit: {}", work);
864                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
865                return true;
866            } else if (status == Status.STATUS_COMMITTED) {
867                // called in afterCompletion, we can schedule immediately
868                log.debug("Scheduling work immediately: {}", work);
869                return false;
870            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
871                log.debug("Cancelling schedule because transaction marked rollback-only: {}", work);
872                return true;
873            } else {
874                log.debug("Not scheduling work after commit because transaction is in status {}: {}", status, work);
875                return false;
876            }
877        } catch (SystemException | RollbackException e) {
878            log.error("Cannot schedule after commit", e);
879            return false;
880        }
881    }
882
883    @Override
884    public Work find(String workId, State state) {
885        return queuing.find(workId, state);
886    }
887
888    /**
889     * @param state SCHEDULED, RUNNING or null for both
890     */
891    protected boolean hasWorkInState(String workId, State state) {
892        return queuing.isWorkInState(workId, state);
893    }
894
895    @Override
896    public State getWorkState(String workId) {
897        return queuing.getWorkState(workId);
898    }
899
900    @Override
901    public List<Work> listWork(String queueId, State state) {
902        // don't return scheduled after commit
903        return queuing.listWork(queueId, state);
904    }
905
906    @Override
907    public List<String> listWorkIds(String queueId, State state) {
908        return queuing.listWorkIds(queueId, state);
909    }
910
911    @Override
912    public WorkQueueMetrics getMetrics(String queueId) {
913        return queuing.metrics(queueId);
914    }
915
916    @Override
917    public int getQueueSize(String queueId, State state) {
918        WorkQueueMetrics metrics = getMetrics(queueId);
919        if (state == null) {
920            return metrics.scheduled.intValue() + metrics.running.intValue();
921        }
922        if (state == State.SCHEDULED) {
923            return metrics.scheduled.intValue();
924        } else if (state == State.RUNNING) {
925            return metrics.running.intValue();
926        } else {
927            throw new IllegalArgumentException(String.valueOf(state));
928        }
929    }
930
931    @Override
932    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
933        return awaitCompletion(null, duration, unit);
934    }
935
936    @Override
937    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
938        if (!isStarted()) {
939            return true;
940        }
941        Tracer tracer = Tracing.getTracer();
942        try (Scope scope = tracer.spanBuilder("workmanager.awaitCompletion").startScopedSpan()) {
943            long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit);
944            long deadline = getTimestampAfter(durationInMs);
945            int pause = (int) Math.min(durationInMs, 500L);
946            log.debug("awaitForCompletion {} ms", durationInMs);
947            do {
948                if (noScheduledOrRunningWork(queueId)) {
949                    completionSynchronizer.signalCompletedWork();
950                    tracer.getCurrentSpan().setStatus(io.opencensus.trace.Status.OK);
951                    return true;
952                }
953                completionSynchronizer.waitForCompletedWork(pause);
954            } while (System.currentTimeMillis() < deadline);
955            log.info("awaitCompletion timeout after {} ms", durationInMs);
956            tracer.getCurrentSpan().setStatus(io.opencensus.trace.Status.DEADLINE_EXCEEDED);
957            return false;
958        }
959    }
960
961    protected long getTimestampAfter(long durationInMs) {
962        long ret = System.currentTimeMillis() + durationInMs;
963        if (ret < 0) {
964            ret = Long.MAX_VALUE;
965        }
966        return ret;
967    }
968
969    protected boolean noScheduledOrRunningWork(String queueId) {
970        if (queueId == null) {
971            for (String id : getWorkQueueIds()) {
972                if (!noScheduledOrRunningWork(id)) {
973                    return false;
974                }
975            }
976            return true;
977        }
978        if (!isProcessingEnabled(queueId)) {
979            return getExecutor(queueId).runningCount.getCount() == 0L;
980        }
981        if (getQueueSize(queueId, null) > 0) {
982            log.trace("{} not empty, sched: {}, running: {}", () -> queueId,
983                    () -> getQueueSize(queueId, State.SCHEDULED), () -> getQueueSize(queueId, State.RUNNING));
984            return false;
985        }
986        log.trace("{} is completed", queueId);
987        return true;
988    }
989
990    @Override
991    public boolean isStarted() {
992        return started && !shutdownInProgress;
993    }
994
995    @Override
996    public boolean supportsProcessingDisabling() {
997        return queuing.supportsProcessingDisabling();
998    }
999
1000}