001/*
002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.RejectedExecutionException;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036
037import javax.naming.NamingException;
038import javax.transaction.RollbackException;
039import javax.transaction.Status;
040import javax.transaction.Synchronization;
041import javax.transaction.SystemException;
042import javax.transaction.Transaction;
043import javax.transaction.TransactionManager;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.nuxeo.common.logging.SequenceTracer;
048import org.nuxeo.common.utils.ExceptionUtils;
049import org.nuxeo.ecm.core.api.NuxeoException;
050import org.nuxeo.ecm.core.event.EventServiceComponent;
051import org.nuxeo.ecm.core.work.WorkQueuing.Listener;
052import org.nuxeo.ecm.core.work.api.Work;
053import org.nuxeo.ecm.core.work.api.Work.State;
054import org.nuxeo.ecm.core.work.api.WorkManager;
055import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
056import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
057import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor;
058import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
059import org.nuxeo.runtime.api.Framework;
060import org.nuxeo.runtime.metrics.MetricsService;
061import org.nuxeo.runtime.metrics.NuxeoMetricSet;
062import org.nuxeo.runtime.model.ComponentContext;
063import org.nuxeo.runtime.model.ComponentInstance;
064import org.nuxeo.runtime.model.ComponentManager;
065import org.nuxeo.runtime.model.DefaultComponent;
066import org.nuxeo.runtime.services.config.ConfigurationService;
067import org.nuxeo.runtime.transaction.TransactionHelper;
068
069import com.codahale.metrics.Counter;
070import com.codahale.metrics.MetricRegistry;
071import com.codahale.metrics.SharedMetricRegistries;
072import com.codahale.metrics.Timer;
073
074/**
075 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
076 * implementation.
077 *
078 * @since 5.6
079 */
080public class WorkManagerImpl extends DefaultComponent implements WorkManager {
081
082    public static final String NAME = "org.nuxeo.ecm.core.work.service";
083
084    private static final Log log = LogFactory.getLog(WorkManagerImpl.class);
085
086    protected static final String QUEUES_EP = "queues";
087
088    protected static final String IMPL_EP = "implementation";
089
090    public static final String DEFAULT_QUEUE_ID = "default";
091
092    public static final String DEFAULT_CATEGORY = "default";
093
094    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
095
096    /**
097     * @since 10.2
098     */
099    public static final String SHUTDOWN_DELAY_MS_KEY = "nuxeo.work.shutdown.delay.ms";
100
101    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
102
103    // @GuardedBy("itself")
104    protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry();
105
106    protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry();
107
108    // used synchronized
109    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
110
111    protected WorkQueuing queuing;
112
113    /**
114     * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another
115     * node in cluster mode.
116     */
117    protected class WorkCompletionSynchronizer {
118        protected final ReentrantLock lock = new ReentrantLock();
119
120        protected final Condition condition = lock.newCondition();
121
122        protected boolean waitForCompletedWork(long timeMs) throws InterruptedException {
123            lock.lock();
124            try {
125                return condition.await(timeMs, TimeUnit.MILLISECONDS); // caller is looping
126            } finally {
127                lock.unlock();
128            }
129        }
130
131        protected void signalCompletedWork() {
132            lock.lock();
133            try {
134                condition.signalAll();
135            } finally {
136                lock.unlock();
137            }
138        }
139
140    }
141
142    protected WorkCompletionSynchronizer completionSynchronizer;
143
144    @Override
145    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
146        if (QUEUES_EP.equals(extensionPoint)) {
147            registerWorkQueueDescriptor((WorkQueueDescriptor) contribution);
148        } else if (IMPL_EP.equals(extensionPoint)) {
149            registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
150        } else {
151            throw new RuntimeException("Unknown extension point: " + extensionPoint);
152        }
153    }
154
155    @Override
156    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
157        if (QUEUES_EP.equals(extensionPoint)) {
158            unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution);
159        } else if (IMPL_EP.equals(extensionPoint)) {
160            unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
161        } else {
162            throw new RuntimeException("Unknown extension point: " + extensionPoint);
163        }
164    }
165
166    void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
167        String queueId = workQueueDescriptor.id;
168        if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) {
169            Boolean processing = workQueueDescriptor.processing;
170            if (processing == null) {
171                log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES
172                        + " with no processing/queuing");
173                return;
174            }
175            String what = " processing=" + processing;
176            what += queuing == null ? "" : " queuing=" + queuing;
177            log.info("Setting on all work queues:" + what);
178            // activate/deactivate processing/queuing on all queues
179            List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy
180            for (String id : queueIds) {
181                // add an updated contribution redefining processing/queuing
182                WorkQueueDescriptor wqd = new WorkQueueDescriptor();
183                wqd.id = id;
184                wqd.processing = processing;
185                registerWorkQueueDescriptor(wqd);
186            }
187            return;
188        }
189        workQueueConfig.addContribution(workQueueDescriptor);
190        WorkQueueDescriptor wqd = workQueueConfig.get(queueId);
191        log.info("Registered work queue " + queueId + " " + wqd.toString());
192    }
193
194    void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
195        String id = workQueueDescriptor.id;
196        if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) {
197            return;
198        }
199        workQueueConfig.removeContribution(workQueueDescriptor);
200        log.info("Unregistered work queue " + id);
201    }
202
203    void initializeQueue(WorkQueueDescriptor config) {
204        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
205            throw new IllegalArgumentException("cannot initialize all queues");
206        }
207        if (queuing.getQueue(config.id) != null) {
208            throw new IllegalStateException("work queue " + config.id + " is already initialized");
209        }
210        if (executors.containsKey(config.id)) {
211            throw new IllegalStateException("work queue " + config.id + " already have an executor");
212        }
213        NuxeoBlockingQueue queue = queuing.init(config);
214        ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-");
215        int maxPoolSize = config.getMaxThreads();
216        WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS,
217                queue, threadFactory);
218        // prestart all core threads so that direct additions to the queue
219        // (from another Nuxeo instance) can be seen
220        executor.prestartAllCoreThreads();
221        executors.put(config.id, executor);
222        log.info("Initialized work queue " + config.id + " " + config.toEffectiveString());
223    }
224
225    void activateQueue(WorkQueueDescriptor config) {
226        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
227            throw new IllegalArgumentException("cannot activate all queues");
228        }
229        queuing.setActive(config.id, config.isProcessingEnabled());
230        log.info("Activated work queue " + config.id + " " + config.toEffectiveString());
231        // Enable metrics
232        if (config.isProcessingEnabled()) {
233            activateQueueMetrics(config.id);
234        }
235    }
236
237    void deactivateQueue(WorkQueueDescriptor config) {
238        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
239            throw new IllegalArgumentException("cannot deactivate all queues");
240        }
241        // Disable metrics
242        if (config.isProcessingEnabled()) {
243            deactivateQueueMetrics(config.id);
244        }
245        queuing.setActive(config.id, false);
246        log.info("Deactivated work queue " + config.id);
247    }
248
249    void activateQueueMetrics(String queueId) {
250        NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId);
251        queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled");
252        queueMetrics.putGauge(() -> getMetrics(queueId).running, "running");
253        queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed");
254        queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled");
255        registry.registerAll(queueMetrics);
256    }
257
258    void deactivateQueueMetrics(String queueId) {
259        String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId);
260        registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName));
261    }
262
263    void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
264        workQueuingConfig.addContribution(descr);
265    }
266
267    void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
268        workQueuingConfig.removeContribution(descr);
269    }
270
271    protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) {
272        try {
273            return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener());
274        } catch (ReflectiveOperationException | SecurityException e) {
275            throw new RuntimeException(e);
276        }
277    }
278
279    @Override
280    public boolean isQueuingEnabled(String queueId) {
281        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
282        return wqd != null && wqd.isQueuingEnabled();
283    }
284
285    @Override
286    public void enableProcessing(boolean value) {
287        workQueueConfig.getQueueIds().forEach(id -> enableProcessing(id, value));
288    }
289
290    @Override
291    public void enableProcessing(String queueId, boolean value) {
292        WorkQueueDescriptor config = workQueueConfig.get(queueId);
293        if (config == null) {
294            throw new IllegalArgumentException("no such queue " + queueId);
295        }
296        if (!value) {
297            deactivateQueue(config);
298        } else {
299            activateQueue(config);
300        }
301    }
302
303    @Override
304    public boolean isProcessingEnabled() {
305        for (String queueId : workQueueConfig.getQueueIds()) {
306            if (queuing.getQueue(queueId).active) {
307                return true;
308            }
309        }
310        return false;
311    }
312
313    @Override
314    public boolean isProcessingEnabled(String queueId) {
315        if (queueId == null) {
316            return isProcessingEnabled();
317        }
318        return queuing.getQueue(queueId).active;
319    }
320
321    // ----- WorkManager -----
322
323    @Override
324    public List<String> getWorkQueueIds() {
325        synchronized (workQueueConfig) {
326            return workQueueConfig.getQueueIds();
327        }
328    }
329
330    @Override
331    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
332        synchronized (workQueueConfig) {
333            return workQueueConfig.get(queueId);
334        }
335    }
336
337    @Override
338    public String getCategoryQueueId(String category) {
339        if (category == null) {
340            category = DEFAULT_CATEGORY;
341        }
342        String queueId = workQueueConfig.getQueueId(category);
343        if (queueId == null) {
344            queueId = DEFAULT_QUEUE_ID;
345        }
346        return queueId;
347    }
348
349    @Override
350    public int getApplicationStartedOrder() {
351        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
352    }
353
354    @Override
355    public void start(ComponentContext context) {
356        init();
357    }
358
359    @Override
360    public void stop(ComponentContext context) throws InterruptedException {
361        // do nothing: we shutdown the thread pool at first before stopping any component
362        // see ComponentManager.Listener.beforeStop
363    }
364
365    protected volatile boolean started = false;
366
367    protected volatile boolean shutdownInProgress = false;
368
369    @Override
370    public void init() {
371        if (started) {
372            return;
373        }
374        synchronized (this) {
375            if (started) {
376                return;
377            }
378            queuing = newWorkQueuing(workQueuingConfig.klass);
379            completionSynchronizer = new WorkCompletionSynchronizer();
380            started = true;
381            workQueueConfig.index();
382            for (String id : workQueueConfig.getQueueIds()) {
383                initializeQueue(workQueueConfig.get(id));
384            }
385
386            Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() {
387                @Override
388                public void beforeStop(ComponentManager mgr, boolean isStandby) {
389                    for (String id : workQueueConfig.getQueueIds()) {
390                        deactivateQueue(workQueueConfig.get(id));
391                    }
392                    try {
393                        if (!shutdown(10, TimeUnit.SECONDS)) {
394                            log.error("Some processors are still active");
395                        }
396                    } catch (InterruptedException e) {
397                        Thread.currentThread().interrupt();
398                        throw new NuxeoException("Interrupted while stopping work manager thread pools", e);
399                    }
400                }
401
402                @Override
403                public void afterStart(ComponentManager mgr, boolean isResume) {
404                    for (String id : workQueueConfig.getQueueIds()) {
405                        activateQueue(workQueueConfig.get(id));
406                    }
407                }
408
409                @Override
410                public void afterStop(ComponentManager mgr, boolean isStandby) {
411                    Framework.getRuntime().getComponentManager().removeListener(this);
412                }
413            });
414        }
415    }
416
417    protected WorkThreadPoolExecutor getExecutor(String queueId) {
418        if (!started) {
419            if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) {
420                LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode");
421                init();
422            } else {
423                throw new IllegalStateException("Work manager not started, could not access to executors");
424            }
425        }
426        WorkQueueDescriptor workQueueDescriptor;
427        synchronized (workQueueConfig) {
428            workQueueDescriptor = workQueueConfig.get(queueId);
429        }
430        if (workQueueDescriptor == null) {
431            throw new IllegalArgumentException("No such work queue: " + queueId);
432        }
433
434        return executors.get(queueId);
435    }
436
437    @Override
438    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
439        WorkThreadPoolExecutor executor = getExecutor(queueId);
440        return shutdownExecutors(Collections.singleton(executor), timeout, unit);
441    }
442
443    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
444            throws InterruptedException {
445        // mark executors as shutting down
446        for (WorkThreadPoolExecutor executor : list) {
447            executor.shutdownAndSuspend();
448        }
449        timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
450        // wait until threads termination
451        for (WorkThreadPoolExecutor executor : list) {
452            long t0 = System.currentTimeMillis();
453            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
454                return false;
455            }
456            timeout -= System.currentTimeMillis() - t0;
457        }
458        return true;
459    }
460
461    /**
462     * @deprecated since 10.2 because unused
463     */
464    @Deprecated
465    protected long remainingMillis(long t0, long delay) {
466        long d = System.currentTimeMillis() - t0;
467        if (d > delay) {
468            return 0;
469        }
470        return delay - d;
471    }
472
473    /**
474     * @deprecated since 10.2 because unused
475     */
476    @Deprecated
477    protected synchronized void removeExecutor(String queueId) {
478        executors.remove(queueId);
479    }
480
481    @Override
482    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
483        shutdownInProgress = true;
484        try {
485            return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit);
486        } finally {
487            shutdownInProgress = false;
488            started = false;
489        }
490    }
491
492    /**
493     * A work instance and how to schedule it, for schedule-after-commit.
494     *
495     * @since 5.8
496     */
497    public class WorkScheduling implements Synchronization {
498        public final Work work;
499
500        public final Scheduling scheduling;
501
502        public WorkScheduling(Work work, Scheduling scheduling) {
503            this.work = work;
504            this.scheduling = scheduling;
505        }
506
507        @Override
508        public void beforeCompletion() {
509        }
510
511        @Override
512        public void afterCompletion(int status) {
513            if (status == Status.STATUS_COMMITTED) {
514                schedule(work, scheduling, false);
515            } else if (status == Status.STATUS_ROLLEDBACK) {
516                work.setWorkInstanceState(State.UNKNOWN);
517            } else {
518                throw new IllegalArgumentException("Unsupported transaction status " + status);
519            }
520        }
521    }
522
523    /**
524     * Creates non-daemon threads at normal priority.
525     */
526    private static class NamedThreadFactory implements ThreadFactory {
527
528        private final AtomicInteger threadNumber = new AtomicInteger();
529
530        private final ThreadGroup group;
531
532        private final String prefix;
533
534        public NamedThreadFactory(String prefix) {
535            SecurityManager sm = System.getSecurityManager();
536            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
537            this.prefix = prefix;
538        }
539
540        @Override
541        public Thread newThread(Runnable r) {
542            String name = prefix + threadNumber.incrementAndGet();
543            Thread thread = new Thread(group, r, name);
544            // do not set daemon
545            thread.setPriority(Thread.NORM_PRIORITY);
546            thread.setUncaughtExceptionHandler(this::handleUncaughtException);
547            return thread;
548        }
549
550        protected void handleUncaughtException(Thread t, Throwable e) {
551            Log logLocal = LogFactory.getLog(WorkManagerImpl.class);
552            if (e instanceof RejectedExecutionException) {
553                // we are responsible of this exception, we use it during shutdown phase to not run the task taken just
554                // before shutdown due to race condition, so log it as WARN
555                logLocal.warn("Rejected execution error on thread " + t.getName(), e);
556            } else if (ExceptionUtils.hasInterruptedCause(e)) {
557                logLocal.warn("Interrupted error on thread" + t.getName(), e);
558            } else {
559                logLocal.error(String.format("Uncaught error on thread: %s, "
560                        + "current work might be lost, WorkManager metrics might be corrupted.", t.getName()), e);
561            }
562        }
563
564    }
565
566    /**
567     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
568     * <p>
569     * Completed tasks are passed to another queue.
570     * <p>
571     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
572     * persisted, etc).
573     *
574     * @since 5.6
575     */
576    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
577
578        protected final String queueId;
579
580        /**
581         * List of running Work instances, in order to be able to interrupt them if requested.
582         */
583        // @GuardedBy("itself")
584        protected final ConcurrentLinkedQueue<Work> running;
585
586        // metrics, in cluster mode these counters must be aggregated, no logic should rely on them
587        // Number of work scheduled by this instance
588        protected final Counter scheduledCount;
589
590        // Number of work currently running on this instance
591        protected final Counter runningCount;
592
593        // Number of work completed by this instance
594        protected final Counter completedCount;
595
596        protected final Timer workTimer;
597
598        protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
599                NuxeoBlockingQueue queue, ThreadFactory threadFactory) {
600            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory);
601            queueId = queue.queueId;
602            running = new ConcurrentLinkedQueue<>();
603            // init metrics
604            scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count"));
605            runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running"));
606            completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed"));
607            workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total"));
608        }
609
610        public int getScheduledOrRunningSize() {
611            int ret = 0;
612            for (String queueId : getWorkQueueIds()) {
613                ret += getQueueSize(queueId, null);
614            }
615            return ret;
616        }
617
618        @Override
619        public void execute(Runnable r) {
620            throw new UnsupportedOperationException("use other api");
621        }
622
623        /**
624         * Executes the given task sometime in the future.
625         *
626         * @param work the work to execute
627         * @see #execute(Runnable)
628         * @deprecated since 10.2 because unused
629         */
630        @Deprecated
631        public void execute(Work work) {
632            scheduledCount.inc();
633            submit(work);
634        }
635
636        /**
637         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
638         * directly
639         */
640        protected void submit(Work work) throws RuntimeException {
641            queuing.workSchedule(queueId, work);
642        }
643
644        @Override
645        protected void beforeExecute(Thread t, Runnable r) {
646            Work work = WorkHolder.getWork(r);
647            if (isShutdown()) {
648                work.setWorkInstanceState(State.SCHEDULED);
649                queuing.workReschedule(queueId, work);
650                throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work);
651            }
652            work.setWorkInstanceState(State.RUNNING);
653            queuing.workRunning(queueId, work);
654            running.add(work);
655            runningCount.inc();
656        }
657
658        @Override
659        protected void afterExecute(Runnable r, Throwable t) {
660            Work work = WorkHolder.getWork(r);
661            try {
662                if (work.isSuspending()) {
663                    log.trace(work + " is suspending, giving up");
664                    return;
665                }
666                if (isShutdown()) {
667                    log.trace("rescheduling " + work.getId(), t);
668                    work.setWorkInstanceState(State.SCHEDULED);
669                    queuing.workReschedule(queueId, work);
670                    return;
671                }
672                work.setWorkInstanceState(State.UNKNOWN);
673                queuing.workCompleted(queueId, work);
674            } finally {
675                running.remove(work);
676                runningCount.dec();
677                completedCount.inc();
678                workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
679                completionSynchronizer.signalCompletedWork();
680            }
681        }
682
683        /**
684         * Initiates a shutdown of this executor and asks for work instances to suspend themselves.
685         *
686         * @throws InterruptedException
687         */
688        public void shutdownAndSuspend() throws InterruptedException {
689            try {
690                // don't consume the queue anymore
691                deactivateQueueMetrics(queueId);
692                queuing.setActive(queueId, false);
693                // suspend all running work
694                boolean hasRunningWork = false;
695                for (Work work : running) {
696                    work.setWorkInstanceSuspending();
697                    log.trace("suspending and rescheduling " + work.getId());
698                    work.setWorkInstanceState(State.SCHEDULED);
699                    queuing.workReschedule(queueId, work);
700                    hasRunningWork = true;
701                }
702                if (hasRunningWork) {
703                    long shutdownDelay = Long.parseLong(
704                            Framework.getService(ConfigurationService.class).getProperty(SHUTDOWN_DELAY_MS_KEY, "0"));
705                    // sleep for a given amount of time for works to have time to persist their state and stop properly
706                    Thread.sleep(shutdownDelay);
707                }
708                shutdownNow();
709            } finally {
710                executors.remove(queueId);
711            }
712        }
713
714        public void removeScheduled(String workId) {
715            queuing.removeScheduled(queueId, workId);
716        }
717
718    }
719
720    @Override
721    public void schedule(Work work) {
722        schedule(work, Scheduling.ENQUEUE, false);
723    }
724
725    @Override
726    public void schedule(Work work, boolean afterCommit) {
727        schedule(work, Scheduling.ENQUEUE, afterCommit);
728    }
729
730    @Override
731    public void schedule(Work work, Scheduling scheduling) {
732        schedule(work, scheduling, false);
733    }
734
735    @Override
736    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
737        String workId = work.getId();
738        String queueId = getCategoryQueueId(work.getCategory());
739        if (!isQueuingEnabled(queueId)) {
740            return;
741        }
742        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
743            return;
744        }
745        work.setWorkInstanceState(State.SCHEDULED);
746        WorkSchedulePath.newInstance(work);
747        switch (scheduling) {
748        case ENQUEUE:
749            break;
750        case CANCEL_SCHEDULED:
751            getExecutor(queueId).removeScheduled(workId);
752            WorkStateHelper.setCanceled(work.getId());
753            break;
754        case IF_NOT_SCHEDULED:
755        case IF_NOT_RUNNING_OR_SCHEDULED:
756            // TODO disabled for now because hasWorkInState uses isScheduled
757            // which is buggy
758            boolean disabled = Boolean.TRUE.booleanValue();
759            if (!disabled && hasWorkInState(workId, scheduling.state)) {
760                if (log.isDebugEnabled()) {
761                    log.debug("Canceling schedule because found: " + scheduling);
762                }
763                return;
764
765            }
766            break;
767
768        }
769        queuing.workSchedule(queueId, work);
770    }
771
772    /**
773     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
774     *
775     * @since 5.8
776     */
777    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
778        TransactionManager transactionManager;
779        try {
780            transactionManager = TransactionHelper.lookupTransactionManager();
781        } catch (NamingException e) {
782            transactionManager = null;
783        }
784        if (transactionManager == null) {
785            if (log.isDebugEnabled()) {
786                log.debug("Not scheduling work after commit because of missing transaction manager: " + work);
787            }
788            return false;
789        }
790        try {
791            Transaction transaction = transactionManager.getTransaction();
792            if (transaction == null) {
793                if (log.isDebugEnabled()) {
794                    log.debug("Not scheduling work after commit because of missing transaction: " + work);
795                }
796                return false;
797            }
798            int status = transaction.getStatus();
799            if (status == Status.STATUS_ACTIVE) {
800                if (log.isDebugEnabled()) {
801                    log.debug("Scheduling work after commit: " + work);
802                }
803                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
804                return true;
805            } else if (status == Status.STATUS_COMMITTED) {
806                // called in afterCompletion, we can schedule immediately
807                if (log.isDebugEnabled()) {
808                    log.debug("Scheduling work immediately: " + work);
809                }
810                return false;
811            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
812                if (log.isDebugEnabled()) {
813                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work);
814                }
815                return true;
816            } else {
817                if (log.isDebugEnabled()) {
818                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
819                            + work);
820                }
821                return false;
822            }
823        } catch (SystemException | RollbackException e) {
824            log.error("Cannot schedule after commit", e);
825            return false;
826        }
827    }
828
829    @Override
830    public Work find(String workId, State state) {
831        return queuing.find(workId, state);
832    }
833
834    /**
835     * @param state SCHEDULED, RUNNING or null for both
836     */
837    protected boolean hasWorkInState(String workId, State state) {
838        return queuing.isWorkInState(workId, state);
839    }
840
841    @Override
842    public State getWorkState(String workId) {
843        return queuing.getWorkState(workId);
844    }
845
846    @Override
847    public List<Work> listWork(String queueId, State state) {
848        // don't return scheduled after commit
849        return queuing.listWork(queueId, state);
850    }
851
852    @Override
853    public List<String> listWorkIds(String queueId, State state) {
854        return queuing.listWorkIds(queueId, state);
855    }
856
857    @Override
858    public WorkQueueMetrics getMetrics(String queueId) {
859        return queuing.metrics(queueId);
860    }
861
862    @Override
863    public int getQueueSize(String queueId, State state) {
864        WorkQueueMetrics metrics = getMetrics(queueId);
865        if (state == null) {
866            return metrics.scheduled.intValue() + metrics.running.intValue();
867        }
868        if (state == State.SCHEDULED) {
869            return metrics.scheduled.intValue();
870        } else if (state == State.RUNNING) {
871            return metrics.running.intValue();
872        } else {
873            throw new IllegalArgumentException(String.valueOf(state));
874        }
875    }
876
877    @Override
878    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
879        return awaitCompletion(null, duration, unit);
880    }
881
882    @Override
883    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
884        if (!isStarted()) {
885            return true;
886        }
887        SequenceTracer.start("awaitCompletion on " + (queueId == null ? "all queues" : queueId));
888        long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit);
889        long deadline = getTimestampAfter(durationInMs);
890        int pause = (int) Math.min(durationInMs, 500L);
891        log.debug("awaitForCompletion " + durationInMs + " ms");
892        do {
893            if (noScheduledOrRunningWork(queueId)) {
894                completionSynchronizer.signalCompletedWork();
895                SequenceTracer.stop("done");
896                return true;
897            }
898            completionSynchronizer.waitForCompletedWork(pause);
899        } while (System.currentTimeMillis() < deadline);
900        log.info("awaitCompletion timeout after " + durationInMs + " ms");
901        SequenceTracer.destroy("timeout after " + durationInMs + " ms");
902        return false;
903    }
904
905    protected long getTimestampAfter(long durationInMs) {
906        long ret = System.currentTimeMillis() + durationInMs;
907        if (ret < 0) {
908            ret = Long.MAX_VALUE;
909        }
910        return ret;
911    }
912
913    protected boolean noScheduledOrRunningWork(String queueId) {
914        if (queueId == null) {
915            for (String id : getWorkQueueIds()) {
916                if (!noScheduledOrRunningWork(id)) {
917                    return false;
918                }
919            }
920            return true;
921        }
922        if (!isProcessingEnabled(queueId)) {
923            return getExecutor(queueId).runningCount.getCount() == 0L;
924        }
925        if (getQueueSize(queueId, null) > 0) {
926            if (log.isTraceEnabled()) {
927                log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + ", running: "
928                        + getQueueSize(queueId, State.RUNNING));
929            }
930            return false;
931        }
932        if (log.isTraceEnabled()) {
933            log.trace(queueId + " is completed");
934        }
935        return true;
936    }
937
938    @Override
939    public boolean isStarted() {
940        return started && !shutdownInProgress;
941    }
942
943}