001/*
002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import static org.nuxeo.ecm.core.work.api.WorkQueueDescriptor.ALL_QUEUES;
023
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.RejectedExecutionException;
032import java.util.concurrent.ThreadFactory;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.locks.Condition;
037import java.util.concurrent.locks.ReentrantLock;
038import java.util.stream.Collectors;
039
040import javax.naming.NamingException;
041import javax.transaction.RollbackException;
042import javax.transaction.Status;
043import javax.transaction.Synchronization;
044import javax.transaction.SystemException;
045import javax.transaction.Transaction;
046import javax.transaction.TransactionManager;
047
048import org.apache.commons.logging.Log;
049import org.apache.commons.logging.LogFactory;
050import org.apache.logging.log4j.LogManager;
051import org.apache.logging.log4j.Logger;
052import org.nuxeo.common.logging.SequenceTracer;
053import org.nuxeo.common.utils.ExceptionUtils;
054import org.nuxeo.ecm.core.api.NuxeoException;
055import org.nuxeo.ecm.core.event.EventServiceComponent;
056import org.nuxeo.ecm.core.work.WorkQueuing.Listener;
057import org.nuxeo.ecm.core.work.api.Work;
058import org.nuxeo.ecm.core.work.api.Work.State;
059import org.nuxeo.ecm.core.work.api.WorkManager;
060import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
061import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
062import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor;
063import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
064import org.nuxeo.runtime.api.Framework;
065import org.nuxeo.runtime.metrics.MetricsService;
066import org.nuxeo.runtime.metrics.NuxeoMetricSet;
067import org.nuxeo.runtime.model.ComponentContext;
068import org.nuxeo.runtime.model.ComponentInstance;
069import org.nuxeo.runtime.model.ComponentManager;
070import org.nuxeo.runtime.model.DefaultComponent;
071import org.nuxeo.runtime.model.Descriptor;
072import org.nuxeo.runtime.services.config.ConfigurationService;
073import org.nuxeo.runtime.transaction.TransactionHelper;
074
075import com.codahale.metrics.Counter;
076import com.codahale.metrics.MetricRegistry;
077import com.codahale.metrics.SharedMetricRegistries;
078import com.codahale.metrics.Timer;
079
080/**
081 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
082 * implementation.
083 *
084 * @since 5.6
085 */
086public class WorkManagerImpl extends DefaultComponent implements WorkManager {
087
088    private static final Logger log = LogManager.getLogger(WorkManagerImpl.class);
089
090    public static final String NAME = "org.nuxeo.ecm.core.work.service";
091
092    protected static final String QUEUES_EP = "queues";
093
094    protected static final String IMPL_EP = "implementation";
095
096    public static final String DEFAULT_QUEUE_ID = "default";
097
098    public static final String DEFAULT_CATEGORY = "default";
099
100    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
101
102    /**
103     * @since 10.2
104     */
105    public static final String SHUTDOWN_DELAY_MS_KEY = "nuxeo.work.shutdown.delay.ms";
106
107    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
108
109    // used synchronized
110    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
111
112    protected final Map<String, String> categoryToQueueId = new HashMap<>();
113
114    protected WorkQueuing queuing;
115
116    protected boolean active = true;
117
118    public WorkManagerImpl() {
119        // make subclasses (StreamWorkManager) use the same registry
120        super.setName(NAME);
121    }
122
123    @Override
124    public void setName(String name) {
125        // do nothing: name is hardcoded and does not depend on XML configuration
126    }
127
128    /**
129     * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another
130     * node in cluster mode.
131     */
132    protected class WorkCompletionSynchronizer {
133        protected final ReentrantLock lock = new ReentrantLock();
134
135        protected final Condition condition = lock.newCondition();
136
137        protected boolean waitForCompletedWork(long timeMs) throws InterruptedException {
138            lock.lock();
139            try {
140                return condition.await(timeMs, TimeUnit.MILLISECONDS); // caller is looping
141            } finally {
142                lock.unlock();
143            }
144        }
145
146        protected void signalCompletedWork() {
147            lock.lock();
148            try {
149                condition.signalAll();
150            } finally {
151                lock.unlock();
152            }
153        }
154
155    }
156
157    protected WorkCompletionSynchronizer completionSynchronizer;
158
159    @Override
160    public void registerContribution(Object contribution, String xp, ComponentInstance component) {
161        if (QUEUES_EP.equals(xp)) {
162            WorkQueueDescriptor descriptor = (WorkQueueDescriptor) contribution;
163            if (ALL_QUEUES.equals(descriptor.getId())) {
164                Boolean processing = descriptor.processing;
165                if (processing == null) {
166                    log.error("Ignoring work queue descriptor {} with no processing/queuing", ALL_QUEUES);
167                    return;
168                }
169                log.info("Setting on all work queues:{}",
170                        () -> " processing=" + processing + (queuing == null ? "" : " queuing=" + queuing));
171                // activate/deactivate processing on all queues
172                getDescriptors(QUEUES_EP).forEach(d -> {
173                    WorkQueueDescriptor wqd = new WorkQueueDescriptor();
174                    wqd.id = d.getId();
175                    wqd.processing = processing;
176                    register(QUEUES_EP, wqd);
177                });
178            } else {
179                register(QUEUES_EP, descriptor);
180            }
181        } else {
182            super.registerContribution(contribution, xp, component);
183        }
184    }
185
186    void initializeQueue(WorkQueueDescriptor config) {
187        if (ALL_QUEUES.equals(config.id)) {
188            throw new IllegalArgumentException("cannot initialize all queues");
189        }
190        if (queuing.getQueue(config.id) != null) {
191            throw new IllegalStateException("work queue " + config.id + " is already initialized");
192        }
193        if (executors.containsKey(config.id)) {
194            throw new IllegalStateException("work queue " + config.id + " already have an executor");
195        }
196        NuxeoBlockingQueue queue = queuing.init(config);
197        ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-");
198        int maxPoolSize = config.getMaxThreads();
199        WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS,
200                queue, threadFactory);
201        // prestart all core threads so that direct additions to the queue
202        // (from another Nuxeo instance) can be seen
203        executor.prestartAllCoreThreads();
204        executors.put(config.id, executor);
205        log.info("Initialized work queue {}, {}", config.id, config);
206    }
207
208    void activateQueue(WorkQueueDescriptor config) {
209        if (ALL_QUEUES.equals(config.id)) {
210            throw new IllegalArgumentException("cannot activate all queues");
211        }
212        queuing.setActive(config.id, config.isProcessingEnabled());
213        log.info("Activated work queue {}, {}",  config.id, config);
214        // Enable metrics
215        if (config.isProcessingEnabled()) {
216            activateQueueMetrics(config.id);
217        }
218    }
219
220    void deactivateQueue(WorkQueueDescriptor config) {
221        if (ALL_QUEUES.equals(config.id)) {
222            throw new IllegalArgumentException("cannot deactivate all queues");
223        }
224        // Disable metrics
225        if (config.isProcessingEnabled()) {
226            deactivateQueueMetrics(config.id);
227        }
228        queuing.setActive(config.id, false);
229        log.info("Deactivated work queue {}", config.id);
230    }
231
232    void activateQueueMetrics(String queueId) {
233        NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId);
234        queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled");
235        queueMetrics.putGauge(() -> getMetrics(queueId).running, "running");
236        queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed");
237        queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled");
238        registry.registerAll(queueMetrics);
239    }
240
241    void deactivateQueueMetrics(String queueId) {
242        String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId);
243        registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName));
244    }
245
246    @Override
247    public boolean isQueuingEnabled(String queueId) {
248        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
249        return wqd != null && wqd.isQueuingEnabled();
250    }
251
252    @Override
253    public void enableProcessing(boolean value) {
254        getDescriptors(QUEUES_EP).forEach(d -> enableProcessing(d.getId(), value));
255    }
256
257    @Override
258    public void enableProcessing(String queueId, boolean value) {
259        WorkQueueDescriptor config = getDescriptor(QUEUES_EP, queueId);
260        if (config == null) {
261            throw new IllegalArgumentException("no such queue " + queueId);
262        }
263        if (!value) {
264            if (!queuing.supportsProcessingDisabling()) {
265                log.error("Attempting to disable works processing on a WorkQueuing instance that does not support it. "
266                        + "Works will still be processed. "
267                        + "Disabling works processing to manage distribution finely can be done using Redis or Stream implementations.");
268            }
269            deactivateQueue(config);
270        } else {
271            activateQueue(config);
272        }
273    }
274
275    @Override
276    public boolean isProcessingEnabled() {
277        for (Descriptor d : getDescriptors(QUEUES_EP)) {
278            if (queuing.getQueue(d.getId()).active) {
279                return true;
280            }
281        }
282        return false;
283    }
284
285    @Override
286    public boolean isProcessingEnabled(String queueId) {
287        if (queueId == null) {
288            return isProcessingEnabled();
289        }
290        return queuing.getQueue(queueId).active;
291    }
292
293    // ----- WorkManager -----
294
295    @Override
296    public List<String> getWorkQueueIds() {
297        synchronized (getRegistry()) {
298            return getDescriptors(QUEUES_EP).stream().map(Descriptor::getId).collect(Collectors.toList());
299        }
300    }
301
302    @Override
303    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
304        synchronized (getRegistry()) {
305            return getDescriptor(QUEUES_EP, queueId);
306        }
307    }
308
309    @Override
310    public String getCategoryQueueId(String category) {
311        if (category == null) {
312            category = DEFAULT_CATEGORY;
313        }
314        String queueId = categoryToQueueId.get(category);
315        if (queueId == null) {
316            queueId = DEFAULT_QUEUE_ID;
317        }
318        return queueId;
319    }
320
321    @Override
322    public int getApplicationStartedOrder() {
323        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
324    }
325
326    @Override
327    public void start(ComponentContext context) {
328        super.start(context);
329        init();
330    }
331
332    protected volatile boolean started = false;
333
334    protected volatile boolean shutdownInProgress = false;
335
336    @Override
337    public void init() {
338        if (started || !active) {
339            return;
340        }
341        synchronized (this) {
342            if (started) {
343                return;
344            }
345            WorkQueuingDescriptor d = getDescriptor(IMPL_EP, Descriptor.UNIQUE_DESCRIPTOR_ID);
346            try {
347                queuing = d.klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener());
348            } catch (ReflectiveOperationException | SecurityException e) {
349                throw new RuntimeException(e);
350            }
351            completionSynchronizer = new WorkCompletionSynchronizer();
352            started = true;
353            index();
354            List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
355            for (WorkQueueDescriptor descriptor : descriptors) {
356                initializeQueue(descriptor);
357            }
358
359            Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() {
360                @Override
361                public void beforeStop(ComponentManager mgr, boolean isStandby) {
362                    List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
363                    for (WorkQueueDescriptor descriptor : descriptors) {
364                        deactivateQueue(descriptor);
365                    }
366                    try {
367                        if (!shutdown(10, TimeUnit.SECONDS)) {
368                            log.error("Some processors are still active");
369                        }
370                    } catch (InterruptedException e) {
371                        Thread.currentThread().interrupt();
372                        throw new NuxeoException("Interrupted while stopping work manager thread pools", e);
373                    }
374                }
375
376                @Override
377                public void afterStart(ComponentManager mgr, boolean isResume) {
378                    List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
379                    for (WorkQueueDescriptor descriptor : descriptors) {
380                        activateQueue(descriptor);
381                    }
382                }
383
384                @Override
385                public void afterStop(ComponentManager mgr, boolean isStandby) {
386                    Framework.getRuntime().getComponentManager().removeListener(this);
387                }
388            });
389        }
390    }
391
392    protected void index() {
393        List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
394        descriptors.forEach(d -> d.categories.forEach(c -> {
395            categoryToQueueId.computeIfPresent(c, (k, v) -> {
396                if (!v.equals(d.getId())) {
397                    log.error("Work category '{}' cannot be assigned to work queue '{}'"
398                            + " because it is already assigned to work queue '{}'", c, d.getId(), v);
399                }
400                return v;
401            });
402            categoryToQueueId.putIfAbsent(c, d.getId());
403        }));
404    }
405
406    protected WorkThreadPoolExecutor getExecutor(String queueId) {
407        if (!started) {
408            if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) {
409                LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode");
410                init();
411            } else {
412                throw new IllegalStateException("Work manager not started, could not access to executors");
413            }
414        }
415        WorkQueueDescriptor workQueueDescriptor;
416        synchronized (getRegistry()) {
417            workQueueDescriptor = getDescriptor(QUEUES_EP, queueId);
418        }
419        if (workQueueDescriptor == null) {
420            throw new IllegalArgumentException("No such work queue: " + queueId);
421        }
422
423        return executors.get(queueId);
424    }
425
426    @Override
427    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
428        WorkThreadPoolExecutor executor = getExecutor(queueId);
429        return shutdownExecutors(Collections.singleton(executor), timeout, unit);
430    }
431
432    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
433            throws InterruptedException {
434        // mark executors as shutting down
435        for (WorkThreadPoolExecutor executor : list) {
436            executor.shutdownAndSuspend();
437        }
438        timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
439        // wait until threads termination
440        for (WorkThreadPoolExecutor executor : list) {
441            long t0 = System.currentTimeMillis();
442            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
443                return false;
444            }
445            timeout -= System.currentTimeMillis() - t0;
446        }
447        return true;
448    }
449
450    /**
451     * @deprecated since 10.2 because unused
452     */
453    @Deprecated
454    protected long remainingMillis(long t0, long delay) {
455        long d = System.currentTimeMillis() - t0;
456        if (d > delay) {
457            return 0;
458        }
459        return delay - d;
460    }
461
462    /**
463     * @deprecated since 10.2 because unused
464     */
465    @Deprecated
466    protected synchronized void removeExecutor(String queueId) {
467        executors.remove(queueId);
468    }
469
470    @Override
471    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
472        shutdownInProgress = true;
473        try {
474            return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit);
475        } finally {
476            shutdownInProgress = false;
477            started = false;
478        }
479    }
480
481    /**
482     * A work instance and how to schedule it, for schedule-after-commit.
483     *
484     * @since 5.8
485     */
486    public class WorkScheduling implements Synchronization {
487        public final Work work;
488
489        public final Scheduling scheduling;
490
491        public WorkScheduling(Work work, Scheduling scheduling) {
492            this.work = work;
493            this.scheduling = scheduling;
494        }
495
496        @Override
497        public void beforeCompletion() {
498        }
499
500        @Override
501        public void afterCompletion(int status) {
502            if (status == Status.STATUS_COMMITTED) {
503                schedule(work, scheduling, false);
504            } else if (status == Status.STATUS_ROLLEDBACK) {
505                work.setWorkInstanceState(State.UNKNOWN);
506            } else {
507                throw new IllegalArgumentException("Unsupported transaction status " + status);
508            }
509        }
510    }
511
512    /**
513     * Creates non-daemon threads at normal priority.
514     */
515    private static class NamedThreadFactory implements ThreadFactory {
516
517        private final AtomicInteger threadNumber = new AtomicInteger();
518
519        private final ThreadGroup group;
520
521        private final String prefix;
522
523        public NamedThreadFactory(String prefix) {
524            SecurityManager sm = System.getSecurityManager();
525            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
526            this.prefix = prefix;
527        }
528
529        @Override
530        public Thread newThread(Runnable r) {
531            String name = prefix + threadNumber.incrementAndGet();
532            Thread thread = new Thread(group, r, name);
533            // do not set daemon
534            thread.setPriority(Thread.NORM_PRIORITY);
535            thread.setUncaughtExceptionHandler(this::handleUncaughtException);
536            return thread;
537        }
538
539        protected void handleUncaughtException(Thread t, Throwable e) {
540            Log logLocal = LogFactory.getLog(WorkManagerImpl.class);
541            if (e instanceof RejectedExecutionException) {
542                // we are responsible of this exception, we use it during shutdown phase to not run the task taken just
543                // before shutdown due to race condition, so log it as WARN
544                logLocal.warn("Rejected execution error on thread " + t.getName(), e);
545            } else if (ExceptionUtils.hasInterruptedCause(e)) {
546                logLocal.warn("Interrupted error on thread" + t.getName(), e);
547            } else {
548                logLocal.error(String.format("Uncaught error on thread: %s, "
549                        + "current work might be lost, WorkManager metrics might be corrupted.", t.getName()), e);
550            }
551        }
552
553    }
554
555    /**
556     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
557     * <p>
558     * Completed tasks are passed to another queue.
559     * <p>
560     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
561     * persisted, etc).
562     *
563     * @since 5.6
564     */
565    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
566
567        protected final String queueId;
568
569        /**
570         * List of running Work instances, in order to be able to interrupt them if requested.
571         */
572        // @GuardedBy("itself")
573        protected final ConcurrentLinkedQueue<Work> running;
574
575        // metrics, in cluster mode these counters must be aggregated, no logic should rely on them
576        // Number of work scheduled by this instance
577        protected final Counter scheduledCount;
578
579        // Number of work currently running on this instance
580        protected final Counter runningCount;
581
582        // Number of work completed by this instance
583        protected final Counter completedCount;
584
585        protected final Timer workTimer;
586
587        protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
588                NuxeoBlockingQueue queue, ThreadFactory threadFactory) {
589            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory);
590            queueId = queue.queueId;
591            running = new ConcurrentLinkedQueue<>();
592            // init metrics
593            scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count"));
594            runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running"));
595            completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed"));
596            workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total"));
597        }
598
599        public int getScheduledOrRunningSize() {
600            int ret = 0;
601            for (String queueId : getWorkQueueIds()) {
602                ret += getQueueSize(queueId, null);
603            }
604            return ret;
605        }
606
607        @Override
608        public void execute(Runnable r) {
609            throw new UnsupportedOperationException("use other api");
610        }
611
612        /**
613         * Executes the given task sometime in the future.
614         *
615         * @param work the work to execute
616         * @see #execute(Runnable)
617         * @deprecated since 10.2 because unused
618         */
619        @Deprecated
620        public void execute(Work work) {
621            scheduledCount.inc();
622            submit(work);
623        }
624
625        /**
626         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
627         * directly
628         */
629        protected void submit(Work work) throws RuntimeException {
630            queuing.workSchedule(queueId, work);
631        }
632
633        @Override
634        protected void beforeExecute(Thread t, Runnable r) {
635            Work work = WorkHolder.getWork(r);
636            if (isShutdown()) {
637                work.setWorkInstanceState(State.SCHEDULED);
638                queuing.workReschedule(queueId, work);
639                throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work);
640            }
641            work.setWorkInstanceState(State.RUNNING);
642            queuing.workRunning(queueId, work);
643            running.add(work);
644            runningCount.inc();
645        }
646
647        @Override
648        protected void afterExecute(Runnable r, Throwable t) {
649            Work work = WorkHolder.getWork(r);
650            try {
651                if (work.isSuspending()) {
652                    log.trace("{} is suspending, giving up", work);
653                    return;
654                }
655                if (isShutdown()) {
656                    log.trace("rescheduling {}", work.getId(), t);
657                    work.setWorkInstanceState(State.SCHEDULED);
658                    queuing.workReschedule(queueId, work);
659                    return;
660                }
661                work.setWorkInstanceState(State.UNKNOWN);
662                queuing.workCompleted(queueId, work);
663            } finally {
664                running.remove(work);
665                runningCount.dec();
666                completedCount.inc();
667                workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
668                completionSynchronizer.signalCompletedWork();
669            }
670        }
671
672        /**
673         * Initiates a shutdown of this executor and asks for work instances to suspend themselves.
674         *
675         * @throws InterruptedException
676         */
677        public void shutdownAndSuspend() throws InterruptedException {
678            try {
679                // don't consume the queue anymore
680                deactivateQueueMetrics(queueId);
681                queuing.setActive(queueId, false);
682                // suspend all running work
683                boolean hasRunningWork = false;
684                for (Work work : running) {
685                    work.setWorkInstanceSuspending();
686                    log.trace("suspending and rescheduling {}", work.getId());
687                    work.setWorkInstanceState(State.SCHEDULED);
688                    queuing.workReschedule(queueId, work);
689                    hasRunningWork = true;
690                }
691                if (hasRunningWork) {
692                    long shutdownDelay = Long.parseLong(
693                            Framework.getService(ConfigurationService.class).getProperty(SHUTDOWN_DELAY_MS_KEY, "0"));
694                    // sleep for a given amount of time for works to have time to persist their state and stop properly
695                    Thread.sleep(shutdownDelay);
696                }
697                shutdownNow();
698            } finally {
699                executors.remove(queueId);
700            }
701        }
702
703        public void removeScheduled(String workId) {
704            queuing.removeScheduled(queueId, workId);
705        }
706
707    }
708
709    @Override
710    public void schedule(Work work) {
711        schedule(work, Scheduling.ENQUEUE, false);
712    }
713
714    @Override
715    public void schedule(Work work, boolean afterCommit) {
716        schedule(work, Scheduling.ENQUEUE, afterCommit);
717    }
718
719    @Override
720    public void schedule(Work work, Scheduling scheduling) {
721        schedule(work, scheduling, false);
722    }
723
724    @Override
725    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
726        String workId = work.getId();
727        String queueId = getCategoryQueueId(work.getCategory());
728        if (!isQueuingEnabled(queueId)) {
729            return;
730        }
731        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
732            return;
733        }
734        work.setWorkInstanceState(State.SCHEDULED);
735        WorkSchedulePath.newInstance(work);
736        switch (scheduling) {
737        case ENQUEUE:
738            break;
739        case CANCEL_SCHEDULED:
740            getExecutor(queueId).removeScheduled(workId);
741            WorkStateHelper.setCanceled(work.getId());
742            break;
743        case IF_NOT_SCHEDULED:
744        case IF_NOT_RUNNING_OR_SCHEDULED:
745            // TODO disabled for now because hasWorkInState uses isScheduled
746            // which is buggy
747            boolean disabled = Boolean.TRUE.booleanValue();
748            if (!disabled && hasWorkInState(workId, scheduling.state)) {
749                log.debug("Canceling schedule because found: {}", scheduling);
750                return;
751
752            }
753            break;
754
755        }
756        queuing.workSchedule(queueId, work);
757    }
758
759    /**
760     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
761     *
762     * @since 5.8
763     */
764    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
765        TransactionManager transactionManager;
766        try {
767            transactionManager = TransactionHelper.lookupTransactionManager();
768        } catch (NamingException e) {
769            transactionManager = null;
770        }
771        if (transactionManager == null) {
772            log.debug("Not scheduling work after commit because of missing transaction manager: {}", work);
773            return false;
774        }
775        try {
776            Transaction transaction = transactionManager.getTransaction();
777            if (transaction == null) {
778                log.debug("Not scheduling work after commit because of missing transaction: {}", work);
779                return false;
780            }
781            int status = transaction.getStatus();
782            if (status == Status.STATUS_ACTIVE) {
783                log.debug("Scheduling work after commit: {}", work);
784                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
785                return true;
786            } else if (status == Status.STATUS_COMMITTED) {
787                // called in afterCompletion, we can schedule immediately
788                log.debug("Scheduling work immediately: {}", work);
789                return false;
790            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
791                log.debug("Cancelling schedule because transaction marked rollback-only: {}", work);
792                return true;
793            } else {
794                log.debug("Not scheduling work after commit because transaction is in status {}: {}", status, work);
795                return false;
796            }
797        } catch (SystemException | RollbackException e) {
798            log.error("Cannot schedule after commit", e);
799            return false;
800        }
801    }
802
803    @Override
804    public Work find(String workId, State state) {
805        return queuing.find(workId, state);
806    }
807
808    /**
809     * @param state SCHEDULED, RUNNING or null for both
810     */
811    protected boolean hasWorkInState(String workId, State state) {
812        return queuing.isWorkInState(workId, state);
813    }
814
815    @Override
816    public State getWorkState(String workId) {
817        return queuing.getWorkState(workId);
818    }
819
820    @Override
821    public List<Work> listWork(String queueId, State state) {
822        // don't return scheduled after commit
823        return queuing.listWork(queueId, state);
824    }
825
826    @Override
827    public List<String> listWorkIds(String queueId, State state) {
828        return queuing.listWorkIds(queueId, state);
829    }
830
831    @Override
832    public WorkQueueMetrics getMetrics(String queueId) {
833        return queuing.metrics(queueId);
834    }
835
836    @Override
837    public int getQueueSize(String queueId, State state) {
838        WorkQueueMetrics metrics = getMetrics(queueId);
839        if (state == null) {
840            return metrics.scheduled.intValue() + metrics.running.intValue();
841        }
842        if (state == State.SCHEDULED) {
843            return metrics.scheduled.intValue();
844        } else if (state == State.RUNNING) {
845            return metrics.running.intValue();
846        } else {
847            throw new IllegalArgumentException(String.valueOf(state));
848        }
849    }
850
851    @Override
852    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
853        return awaitCompletion(null, duration, unit);
854    }
855
856    @Override
857    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
858        if (!isStarted()) {
859            return true;
860        }
861        SequenceTracer.start("awaitCompletion on " + (queueId == null ? "all queues" : queueId));
862        long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit);
863        long deadline = getTimestampAfter(durationInMs);
864        int pause = (int) Math.min(durationInMs, 500L);
865        log.debug("awaitForCompletion {} ms", durationInMs);
866        do {
867            if (noScheduledOrRunningWork(queueId)) {
868                completionSynchronizer.signalCompletedWork();
869                SequenceTracer.stop("done");
870                return true;
871            }
872            completionSynchronizer.waitForCompletedWork(pause);
873        } while (System.currentTimeMillis() < deadline);
874        log.info("awaitCompletion timeout after {} ms", durationInMs);
875        SequenceTracer.destroy("timeout after " + durationInMs + " ms");
876        return false;
877    }
878
879    protected long getTimestampAfter(long durationInMs) {
880        long ret = System.currentTimeMillis() + durationInMs;
881        if (ret < 0) {
882            ret = Long.MAX_VALUE;
883        }
884        return ret;
885    }
886
887    protected boolean noScheduledOrRunningWork(String queueId) {
888        if (queueId == null) {
889            for (String id : getWorkQueueIds()) {
890                if (!noScheduledOrRunningWork(id)) {
891                    return false;
892                }
893            }
894            return true;
895        }
896        if (!isProcessingEnabled(queueId)) {
897            return getExecutor(queueId).runningCount.getCount() == 0L;
898        }
899        if (getQueueSize(queueId, null) > 0) {
900            log.trace("{} not empty, sched: {}, running: {}", () -> queueId,
901                    () -> getQueueSize(queueId, State.SCHEDULED), () -> getQueueSize(queueId, State.RUNNING));
902            return false;
903        }
904        log.trace("{} is completed", queueId);
905        return true;
906    }
907
908    @Override
909    public boolean isStarted() {
910        return started && !shutdownInProgress;
911    }
912
913    @Override
914    public boolean supportsProcessingDisabling() {
915        return queuing.supportsProcessingDisabling();
916    }
917
918}