001/*
002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.RejectedExecutionException;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036
037import javax.naming.NamingException;
038import javax.transaction.RollbackException;
039import javax.transaction.Status;
040import javax.transaction.Synchronization;
041import javax.transaction.SystemException;
042import javax.transaction.Transaction;
043import javax.transaction.TransactionManager;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.nuxeo.common.logging.SequenceTracer;
048import org.nuxeo.common.utils.ExceptionUtils;
049import org.nuxeo.ecm.core.api.NuxeoException;
050import org.nuxeo.ecm.core.event.EventServiceComponent;
051import org.nuxeo.ecm.core.work.WorkQueuing.Listener;
052import org.nuxeo.ecm.core.work.api.Work;
053import org.nuxeo.ecm.core.work.api.Work.State;
054import org.nuxeo.ecm.core.work.api.WorkManager;
055import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
056import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
057import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor;
058import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
059import org.nuxeo.runtime.api.Framework;
060import org.nuxeo.runtime.metrics.MetricsService;
061import org.nuxeo.runtime.metrics.NuxeoMetricSet;
062import org.nuxeo.runtime.model.ComponentContext;
063import org.nuxeo.runtime.model.ComponentInstance;
064import org.nuxeo.runtime.model.ComponentManager;
065import org.nuxeo.runtime.model.DefaultComponent;
066import org.nuxeo.runtime.transaction.TransactionHelper;
067
068import com.codahale.metrics.Counter;
069import com.codahale.metrics.MetricRegistry;
070import com.codahale.metrics.SharedMetricRegistries;
071import com.codahale.metrics.Timer;
072
073/**
074 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
075 * implementation.
076 *
077 * @since 5.6
078 */
079public class WorkManagerImpl extends DefaultComponent implements WorkManager {
080
081    public static final String NAME = "org.nuxeo.ecm.core.work.service";
082
083    private static final Log log = LogFactory.getLog(WorkManagerImpl.class);
084
085    protected static final String QUEUES_EP = "queues";
086
087    protected static final String IMPL_EP = "implementation";
088
089    public static final String DEFAULT_QUEUE_ID = "default";
090
091    public static final String DEFAULT_CATEGORY = "default";
092
093    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
094
095    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
096
097    // @GuardedBy("itself")
098    protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry();
099
100    protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry();
101
102    // used synchronized
103    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
104
105    protected WorkQueuing queuing;
106
107    /**
108     * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another
109     * node in cluster mode.
110     */
111    protected class WorkCompletionSynchronizer {
112        final protected ReentrantLock lock = new ReentrantLock();
113
114        final protected Condition condition = lock.newCondition();
115
116        protected boolean waitForCompletedWork(long timeMs) throws InterruptedException {
117            lock.lock();
118            try {
119                return condition.await(timeMs, TimeUnit.MILLISECONDS);
120            } finally {
121                lock.unlock();
122            }
123        }
124
125        protected void signalCompletedWork() {
126            lock.lock();
127            try {
128                condition.signalAll();
129            } finally {
130                lock.unlock();
131            }
132        }
133
134    }
135
136    protected WorkCompletionSynchronizer completionSynchronizer;
137
138    @Override
139    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
140        if (QUEUES_EP.equals(extensionPoint)) {
141            registerWorkQueueDescriptor((WorkQueueDescriptor) contribution);
142        } else if (IMPL_EP.equals(extensionPoint)) {
143            registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
144        } else {
145            throw new RuntimeException("Unknown extension point: " + extensionPoint);
146        }
147    }
148
149    @Override
150    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
151        if (QUEUES_EP.equals(extensionPoint)) {
152            unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution);
153        } else if (IMPL_EP.equals(extensionPoint)) {
154            unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
155        } else {
156            throw new RuntimeException("Unknown extension point: " + extensionPoint);
157        }
158    }
159
160    void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
161        String queueId = workQueueDescriptor.id;
162        if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) {
163            Boolean processing = workQueueDescriptor.processing;
164            if (processing == null) {
165                log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES
166                        + " with no processing/queuing");
167                return;
168            }
169            String what = " processing=" + processing;
170            what += queuing == null ? "" : (" queuing=" + queuing);
171            log.info("Setting on all work queues:" + what);
172            // activate/deactivate processing/queuing on all queues
173            List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy
174            for (String id : queueIds) {
175                // add an updated contribution redefining processing/queuing
176                WorkQueueDescriptor wqd = new WorkQueueDescriptor();
177                wqd.id = id;
178                wqd.processing = processing;
179                registerWorkQueueDescriptor(wqd);
180            }
181            return;
182        }
183        workQueueConfig.addContribution(workQueueDescriptor);
184        WorkQueueDescriptor wqd = workQueueConfig.get(queueId);
185        log.info("Registered work queue " + queueId + " " + wqd.toString());
186    }
187
188    void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
189        String id = workQueueDescriptor.id;
190        if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) {
191            return;
192        }
193        workQueueConfig.removeContribution(workQueueDescriptor);
194        log.info("Unregistered work queue " + id);
195    }
196
197    void initializeQueue(WorkQueueDescriptor config) {
198        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
199            throw new IllegalArgumentException("cannot initialize all queues");
200        }
201        if (queuing.getQueue(config.id) != null) {
202            throw new IllegalStateException("work queue " + config.id + " is already initialized");
203        }
204        if (executors.containsKey(config.id)) {
205            throw new IllegalStateException("work queue " + config.id + " already have an executor");
206        }
207        NuxeoBlockingQueue queue = queuing.init(config);
208        ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-");
209        int maxPoolSize = config.getMaxThreads();
210        WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS,
211                queue, threadFactory);
212        // prestart all core threads so that direct additions to the queue
213        // (from another Nuxeo instance) can be seen
214        executor.prestartAllCoreThreads();
215        executors.put(config.id, executor);
216        log.info("Initialized work queue " + config.id + " " + config.toEffectiveString());
217    }
218
219    void activateQueue(WorkQueueDescriptor config) {
220        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
221            throw new IllegalArgumentException("cannot activate all queues");
222        }
223        queuing.setActive(config.id, config.isProcessingEnabled());
224        log.info("Activated work queue " + config.id + " " + config.toEffectiveString());
225        // Enable metrics
226        if (config.isProcessingEnabled()) {
227            activateQueueMetrics(config.id);
228        }
229    }
230
231    void deactivateQueue(WorkQueueDescriptor config) {
232        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
233            throw new IllegalArgumentException("cannot deactivate all queues");
234        }
235        // Disable metrics
236        if (config.isProcessingEnabled()) {
237            deactivateQueueMetrics(config.id);
238        }
239        queuing.setActive(config.id, false);
240        log.info("Deactivated work queue " + config.id);
241    }
242
243    void activateQueueMetrics(String queueId) {
244        NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId);
245        queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled");
246        queueMetrics.putGauge(() -> getMetrics(queueId).running, "running");
247        queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed");
248        queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled");
249        registry.registerAll(queueMetrics);
250    }
251
252    void deactivateQueueMetrics(String queueId) {
253        String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId);
254        registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName));
255    }
256
257    void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
258        workQueuingConfig.addContribution(descr);
259    }
260
261    void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
262        workQueuingConfig.removeContribution(descr);
263    }
264
265    protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) {
266        try {
267            return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener());
268        } catch (ReflectiveOperationException | SecurityException e) {
269            throw new RuntimeException(e);
270        }
271    }
272
273    @Override
274    public boolean isQueuingEnabled(String queueId) {
275        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
276        return wqd != null && wqd.isQueuingEnabled();
277    }
278
279    @Override
280    public void enableProcessing(boolean value) {
281        workQueueConfig.getQueueIds().forEach(id -> enableProcessing(id, value));
282    }
283
284    @Override
285    public void enableProcessing(String queueId, boolean value) {
286        WorkQueueDescriptor config = workQueueConfig.get(queueId);
287        if (config == null) {
288            throw new IllegalArgumentException("no such queue " + queueId);
289        }
290        if (!value) {
291            deactivateQueue(config);
292        } else {
293            activateQueue(config);
294        }
295    }
296
297    @Override
298    public boolean isProcessingEnabled() {
299        for (String queueId : workQueueConfig.getQueueIds()) {
300            if (queuing.getQueue(queueId).active) {
301                return true;
302            }
303        }
304        return false;
305    }
306
307    @Override
308    public boolean isProcessingEnabled(String queueId) {
309        if (queueId == null) {
310            return isProcessingEnabled();
311        }
312        return queuing.getQueue(queueId).active;
313    }
314
315    // ----- WorkManager -----
316
317    @Override
318    public List<String> getWorkQueueIds() {
319        synchronized (workQueueConfig) {
320            return workQueueConfig.getQueueIds();
321        }
322    }
323
324    @Override
325    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
326        synchronized (workQueueConfig) {
327            return workQueueConfig.get(queueId);
328        }
329    }
330
331    @Override
332    public String getCategoryQueueId(String category) {
333        if (category == null) {
334            category = DEFAULT_CATEGORY;
335        }
336        String queueId = workQueueConfig.getQueueId(category);
337        if (queueId == null) {
338            queueId = DEFAULT_QUEUE_ID;
339        }
340        return queueId;
341    }
342
343    @Override
344    public int getApplicationStartedOrder() {
345        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
346    }
347
348    @Override
349    public void start(ComponentContext context) {
350        init();
351    }
352
353    @Override
354    public void stop(ComponentContext context) throws InterruptedException {
355        // do nothing: we shutdown the thread pool at first before stopping any component
356        // see ComponentManager.Listener.beforeStop
357    }
358
359    protected volatile boolean started = false;
360
361    protected volatile boolean shutdownInProgress = false;
362
363    @Override
364    public void init() {
365        if (started) {
366            return;
367        }
368        synchronized (this) {
369            if (started) {
370                return;
371            }
372            queuing = newWorkQueuing(workQueuingConfig.klass);
373            completionSynchronizer = new WorkCompletionSynchronizer();
374            started = true;
375            workQueueConfig.index();
376            for (String id : workQueueConfig.getQueueIds()) {
377                initializeQueue(workQueueConfig.get(id));
378            }
379
380            Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() {
381                @Override
382                public void beforeStop(ComponentManager mgr, boolean isStandby) {
383                    for (String id : workQueueConfig.getQueueIds()) {
384                        deactivateQueue(workQueueConfig.get(id));
385                    }
386                    try {
387                        if (!shutdown(10, TimeUnit.SECONDS)) {
388                            log.error("Some processors are still active");
389                        }
390                    } catch (InterruptedException e) {
391                        Thread.currentThread().interrupt();
392                        throw new NuxeoException("Interrupted while stopping work manager thread pools", e);
393                    }
394                }
395
396                @Override
397                public void afterStart(ComponentManager mgr, boolean isResume) {
398                    for (String id : workQueueConfig.getQueueIds()) {
399                        activateQueue(workQueueConfig.get(id));
400                    }
401                }
402
403                @Override
404                public void afterStop(ComponentManager mgr, boolean isStandby) {
405                    Framework.getRuntime().getComponentManager().removeListener(this);
406                }
407            });
408        }
409    }
410
411    protected WorkThreadPoolExecutor getExecutor(String queueId) {
412        if (!started) {
413            if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) {
414                LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode");
415                init();
416            } else {
417                throw new IllegalStateException("Work manager not started, could not access to executors");
418            }
419        }
420        WorkQueueDescriptor workQueueDescriptor;
421        synchronized (workQueueConfig) {
422            workQueueDescriptor = workQueueConfig.get(queueId);
423        }
424        if (workQueueDescriptor == null) {
425            throw new IllegalArgumentException("No such work queue: " + queueId);
426        }
427
428        return executors.get(queueId);
429    }
430
431    @Override
432    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
433        WorkThreadPoolExecutor executor = getExecutor(queueId);
434        return shutdownExecutors(Collections.singleton(executor), timeout, unit);
435    }
436
437    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
438            throws InterruptedException {
439        // mark executors as shutting down
440        for (WorkThreadPoolExecutor executor : list) {
441            executor.shutdownAndSuspend();
442        }
443        timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
444        // wait until threads termination
445        for (WorkThreadPoolExecutor executor : list) {
446            long t0 = System.currentTimeMillis();
447            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
448                return false;
449            }
450            timeout -= unit.convert(System.currentTimeMillis() - t0, TimeUnit.MILLISECONDS);
451        }
452        return true;
453    }
454
455    protected long remainingMillis(long t0, long delay) {
456        long d = System.currentTimeMillis() - t0;
457        if (d > delay) {
458            return 0;
459        }
460        return delay - d;
461    }
462
463    protected synchronized void removeExecutor(String queueId) {
464        executors.remove(queueId);
465    }
466
467    @Override
468    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
469        shutdownInProgress = true;
470        try {
471            return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit);
472        } finally {
473            shutdownInProgress = false;
474            started = false;
475        }
476    }
477
478    /**
479     * A work instance and how to schedule it, for schedule-after-commit.
480     *
481     * @since 5.8
482     */
483    public class WorkScheduling implements Synchronization {
484        public final Work work;
485
486        public final Scheduling scheduling;
487
488        public WorkScheduling(Work work, Scheduling scheduling) {
489            this.work = work;
490            this.scheduling = scheduling;
491        }
492
493        @Override
494        public void beforeCompletion() {
495        }
496
497        @Override
498        public void afterCompletion(int status) {
499            if (status == Status.STATUS_COMMITTED) {
500                schedule(work, scheduling, false);
501            } else if (status == Status.STATUS_ROLLEDBACK) {
502                work.setWorkInstanceState(State.UNKNOWN);
503            } else {
504                throw new IllegalArgumentException("Unsupported transaction status " + status);
505            }
506        }
507    }
508
509    /**
510     * Creates non-daemon threads at normal priority.
511     */
512    private static class NamedThreadFactory implements ThreadFactory {
513
514        private final AtomicInteger threadNumber = new AtomicInteger();
515
516        private final ThreadGroup group;
517
518        private final String prefix;
519
520        public NamedThreadFactory(String prefix) {
521            SecurityManager sm = System.getSecurityManager();
522            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
523            this.prefix = prefix;
524        }
525
526        @Override
527        public Thread newThread(Runnable r) {
528            String name = prefix + threadNumber.incrementAndGet();
529            Thread thread = new Thread(group, r, name);
530            // do not set daemon
531            thread.setPriority(Thread.NORM_PRIORITY);
532            thread.setUncaughtExceptionHandler(this::handleUncaughtException);
533            return thread;
534        }
535
536        protected void handleUncaughtException(Thread t, Throwable e) {
537            Log logLocal = LogFactory.getLog(WorkManagerImpl.class);
538            if (e instanceof RejectedExecutionException) {
539                // we are responsible of this exception, we use it during shutdown phase to not run the task taken just
540                // before shutdown due to race condition, so log it as WARN
541                logLocal.warn("Rejected execution error on thread " + t.getName(), e);
542            } else if (ExceptionUtils.hasInterruptedCause(e)) {
543                logLocal.warn("Interrupted error on thread" + t.getName(), e);
544            } else {
545                logLocal.error(String.format("Uncaught error on thread: %s, "
546                        + "current work might be lost, WorkManager metrics might be corrupted.", t.getName()), e);
547            }
548        }
549
550    }
551
552    /**
553     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
554     * <p>
555     * Completed tasks are passed to another queue.
556     * <p>
557     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
558     * persisted, etc).
559     *
560     * @since 5.6
561     */
562    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
563
564        protected final String queueId;
565
566        /**
567         * List of running Work instances, in order to be able to interrupt them if requested.
568         */
569        // @GuardedBy("itself")
570        protected final ConcurrentLinkedQueue<Work> running;
571
572        // metrics, in cluster mode these counters must be aggregated, no logic should rely on them
573        // Number of work scheduled by this instance
574        protected final Counter scheduledCount;
575
576        // Number of work currently running on this instance
577        protected final Counter runningCount;
578
579        // Number of work completed by this instance
580        protected final Counter completedCount;
581
582        protected final Timer workTimer;
583
584        protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
585                NuxeoBlockingQueue queue, ThreadFactory threadFactory) {
586            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory);
587            queueId = queue.queueId;
588            running = new ConcurrentLinkedQueue<>();
589            // init metrics
590            scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count"));
591            runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running"));
592            completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed"));
593            workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total"));
594        }
595
596        public int getScheduledOrRunningSize() {
597            int ret = 0;
598            for (String queueId : getWorkQueueIds()) {
599                ret += getQueueSize(queueId, null);
600            }
601            return ret;
602        }
603
604        @Override
605        public void execute(Runnable r) {
606            throw new UnsupportedOperationException("use other api");
607        }
608
609        /**
610         * Executes the given task sometime in the future.
611         *
612         * @param work the work to execute
613         * @see #execute(Runnable)
614         */
615        public void execute(Work work) {
616            scheduledCount.inc();
617            submit(work);
618        }
619
620        /**
621         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
622         * directly
623         */
624        protected void submit(Work work) throws RuntimeException {
625            queuing.workSchedule(queueId, work);
626        }
627
628        @Override
629        protected void beforeExecute(Thread t, Runnable r) {
630            Work work = WorkHolder.getWork(r);
631            if (isShutdown()) {
632                work.setWorkInstanceState(State.SCHEDULED);
633                queuing.workReschedule(queueId, work);
634                throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work);
635            }
636            work.setWorkInstanceState(State.RUNNING);
637            queuing.workRunning(queueId, work);
638            running.add(work);
639            runningCount.inc();
640        }
641
642        @Override
643        protected void afterExecute(Runnable r, Throwable t) {
644            Work work = WorkHolder.getWork(r);
645            try {
646                if (work.isSuspending()) {
647                    log.trace(work + " is suspending, giving up");
648                    return;
649                }
650                if (isShutdown()) {
651                    log.trace("rescheduling " + work.getId(), t);
652                    work.setWorkInstanceState(State.SCHEDULED);
653                    queuing.workReschedule(queueId, work);
654                    return;
655                }
656                work.setWorkInstanceState(State.UNKNOWN);
657                queuing.workCompleted(queueId, work);
658            } finally {
659                running.remove(work);
660                runningCount.dec();
661                completedCount.inc();
662                workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
663                completionSynchronizer.signalCompletedWork();
664            }
665        }
666
667        /**
668         * Initiates a shutdown of this executor and asks for work instances to suspend themselves.
669         *
670         * @throws InterruptedException
671         */
672        public void shutdownAndSuspend() throws InterruptedException {
673            try {
674                // don't consume the queue anymore
675                deactivateQueueMetrics(queueId);
676                queuing.setActive(queueId, false);
677                // suspend all running work
678                for (Work work : running) {
679                    work.setWorkInstanceSuspending();
680                    log.trace("suspending and rescheduling " + work.getId());
681                    work.setWorkInstanceState(State.SCHEDULED);
682                    queuing.workReschedule(queueId, work);
683                }
684                shutdownNow();
685            } finally {
686                executors.remove(queueId);
687            }
688        }
689
690        public void removeScheduled(String workId) {
691            queuing.removeScheduled(queueId, workId);
692        }
693
694    }
695
696    @Override
697    public void schedule(Work work) {
698        schedule(work, Scheduling.ENQUEUE, false);
699    }
700
701    @Override
702    public void schedule(Work work, boolean afterCommit) {
703        schedule(work, Scheduling.ENQUEUE, afterCommit);
704    }
705
706    @Override
707    public void schedule(Work work, Scheduling scheduling) {
708        schedule(work, scheduling, false);
709    }
710
711    @Override
712    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
713        String workId = work.getId();
714        String queueId = getCategoryQueueId(work.getCategory());
715        if (!isQueuingEnabled(queueId)) {
716            return;
717        }
718        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
719            return;
720        }
721        work.setWorkInstanceState(State.SCHEDULED);
722        WorkSchedulePath.newInstance(work);
723        switch (scheduling) {
724        case ENQUEUE:
725            break;
726        case CANCEL_SCHEDULED:
727            getExecutor(queueId).removeScheduled(workId);
728            break;
729        case IF_NOT_SCHEDULED:
730        case IF_NOT_RUNNING_OR_SCHEDULED:
731            // TODO disabled for now because hasWorkInState uses isScheduled
732            // which is buggy
733            boolean disabled = Boolean.TRUE.booleanValue();
734            if (!disabled && hasWorkInState(workId, scheduling.state)) {
735                if (log.isDebugEnabled()) {
736                    log.debug("Canceling schedule because found: " + scheduling);
737                }
738                return;
739
740            }
741            break;
742
743        }
744        queuing.workSchedule(queueId, work);
745    }
746
747    /**
748     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
749     *
750     * @since 5.8
751     */
752    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
753        TransactionManager transactionManager;
754        try {
755            transactionManager = TransactionHelper.lookupTransactionManager();
756        } catch (NamingException e) {
757            transactionManager = null;
758        }
759        if (transactionManager == null) {
760            if (log.isDebugEnabled()) {
761                log.debug("Not scheduling work after commit because of missing transaction manager: " + work);
762            }
763            return false;
764        }
765        try {
766            Transaction transaction = transactionManager.getTransaction();
767            if (transaction == null) {
768                if (log.isDebugEnabled()) {
769                    log.debug("Not scheduling work after commit because of missing transaction: " + work);
770                }
771                return false;
772            }
773            int status = transaction.getStatus();
774            if (status == Status.STATUS_ACTIVE) {
775                if (log.isDebugEnabled()) {
776                    log.debug("Scheduling work after commit: " + work);
777                }
778                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
779                return true;
780            } else if (status == Status.STATUS_COMMITTED) {
781                // called in afterCompletion, we can schedule immediately
782                if (log.isDebugEnabled()) {
783                    log.debug("Scheduling work immediately: " + work);
784                }
785                return false;
786            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
787                if (log.isDebugEnabled()) {
788                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work);
789                }
790                return true;
791            } else {
792                if (log.isDebugEnabled()) {
793                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
794                            + work);
795                }
796                return false;
797            }
798        } catch (SystemException | RollbackException e) {
799            log.error("Cannot schedule after commit", e);
800            return false;
801        }
802    }
803
804    @Override
805    public Work find(String workId, State state) {
806        return queuing.find(workId, state);
807    }
808
809    /**
810     * @param state SCHEDULED, RUNNING or null for both
811     */
812    protected boolean hasWorkInState(String workId, State state) {
813        return queuing.isWorkInState(workId, state);
814    }
815
816    @Override
817    public State getWorkState(String workId) {
818        return queuing.getWorkState(workId);
819    }
820
821    @Override
822    public List<Work> listWork(String queueId, State state) {
823        // don't return scheduled after commit
824        return queuing.listWork(queueId, state);
825    }
826
827    @Override
828    public List<String> listWorkIds(String queueId, State state) {
829        return queuing.listWorkIds(queueId, state);
830    }
831
832    @Override
833    public WorkQueueMetrics getMetrics(String queueId) {
834        return queuing.metrics(queueId);
835    }
836
837    @Override
838    public int getQueueSize(String queueId, State state) {
839        WorkQueueMetrics metrics = getMetrics(queueId);
840        if (state == null) {
841            return metrics.scheduled.intValue() + metrics.running.intValue();
842        }
843        if (state == State.SCHEDULED) {
844            return metrics.scheduled.intValue();
845        } else if (state == State.RUNNING) {
846            return metrics.running.intValue();
847        } else {
848            throw new IllegalArgumentException(String.valueOf(state));
849        }
850    }
851
852    @Override
853    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
854        return awaitCompletion(null, duration, unit);
855    }
856
857    @Override
858    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
859        if (!isStarted()) {
860            return true;
861        }
862        SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId));
863        long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit);
864        long deadline = getTimestampAfter(durationInMs);
865        int pause = (int) Math.min(durationInMs, 500L);
866        log.debug("awaitForCompletion " + durationInMs + " ms");
867        do {
868            if (noScheduledOrRunningWork(queueId)) {
869                completionSynchronizer.signalCompletedWork();
870                SequenceTracer.stop("done");
871                return true;
872            }
873            completionSynchronizer.waitForCompletedWork(pause);
874        } while (System.currentTimeMillis() < deadline);
875        log.info("awaitCompletion timeout after " + durationInMs + " ms");
876        SequenceTracer.destroy("timeout after " + durationInMs + " ms");
877        return false;
878    }
879
880    protected long getTimestampAfter(long durationInMs) {
881        long ret = System.currentTimeMillis() + durationInMs;
882        if (ret < 0) {
883            ret = Long.MAX_VALUE;
884        }
885        return ret;
886    }
887
888    protected boolean noScheduledOrRunningWork(String queueId) {
889        if (queueId == null) {
890            for (String id : getWorkQueueIds()) {
891                if (!noScheduledOrRunningWork(id)) {
892                    return false;
893                }
894            }
895            return true;
896        }
897        if (!isProcessingEnabled(queueId)) {
898            return getExecutor(queueId).runningCount.getCount() == 0L;
899        }
900        if (getQueueSize(queueId, null) > 0) {
901            if (log.isTraceEnabled()) {
902                log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + ", running: "
903                        + getQueueSize(queueId, State.RUNNING));
904            }
905            return false;
906        }
907        if (log.isTraceEnabled()) {
908            log.trace(queueId + " is completed");
909        }
910        return true;
911    }
912
913    @Override
914    public boolean isStarted() {
915        return started && !shutdownInProgress;
916    }
917
918}