001/*
002 * (C) Copyright 2012-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.RejectedExecutionException;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036
037import javax.naming.NamingException;
038import javax.transaction.RollbackException;
039import javax.transaction.Status;
040import javax.transaction.Synchronization;
041import javax.transaction.SystemException;
042import javax.transaction.Transaction;
043import javax.transaction.TransactionManager;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.nuxeo.common.logging.SequenceTracer;
048import org.nuxeo.ecm.core.api.NuxeoException;
049import org.nuxeo.ecm.core.event.EventServiceComponent;
050import org.nuxeo.ecm.core.work.WorkQueuing.Listener;
051import org.nuxeo.ecm.core.work.api.Work;
052import org.nuxeo.ecm.core.work.api.Work.State;
053import org.nuxeo.ecm.core.work.api.WorkManager;
054import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
055import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
056import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor;
057import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
058import org.nuxeo.runtime.api.Framework;
059import org.nuxeo.runtime.metrics.MetricsService;
060import org.nuxeo.runtime.metrics.NuxeoMetricSet;
061import org.nuxeo.runtime.model.ComponentContext;
062import org.nuxeo.runtime.model.ComponentInstance;
063import org.nuxeo.runtime.model.ComponentManager;
064import org.nuxeo.runtime.model.DefaultComponent;
065import org.nuxeo.runtime.transaction.TransactionHelper;
066
067import com.codahale.metrics.Counter;
068import com.codahale.metrics.MetricRegistry;
069import com.codahale.metrics.SharedMetricRegistries;
070import com.codahale.metrics.Timer;
071
072/**
073 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
074 * implementation.
075 *
076 * @since 5.6
077 */
078public class WorkManagerImpl extends DefaultComponent implements WorkManager {
079
080    public static final String NAME = "org.nuxeo.ecm.core.work.service";
081
082    private static final Log log = LogFactory.getLog(WorkManagerImpl.class);
083
084    protected static final String QUEUES_EP = "queues";
085
086    protected static final String IMPL_EP = "implementation";
087
088    public static final String DEFAULT_QUEUE_ID = "default";
089
090    public static final String DEFAULT_CATEGORY = "default";
091
092    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
093
094    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
095
096    // @GuardedBy("itself")
097    protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry();
098
099    protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry();
100
101    // used synchronized
102    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
103
104    protected WorkQueuing queuing;
105
106    /**
107     * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another
108     * node in cluster mode.
109     */
110    protected class WorkCompletionSynchronizer {
111        final protected ReentrantLock lock = new ReentrantLock();
112
113        final protected Condition condition = lock.newCondition();
114
115        protected boolean waitForCompletedWork(long timeMs) throws InterruptedException {
116            lock.lock();
117            try {
118                return condition.await(timeMs, TimeUnit.MILLISECONDS);
119            } finally {
120                lock.unlock();
121            }
122        }
123
124        protected void signalCompletedWork() {
125            lock.lock();
126            try {
127                condition.signalAll();
128            } finally {
129                lock.unlock();
130            }
131        }
132
133    }
134
135    protected WorkCompletionSynchronizer completionSynchronizer;
136
137    @Override
138    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
139        if (QUEUES_EP.equals(extensionPoint)) {
140            registerWorkQueueDescriptor((WorkQueueDescriptor) contribution);
141        } else if (IMPL_EP.equals(extensionPoint)) {
142            registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
143        } else {
144            throw new RuntimeException("Unknown extension point: " + extensionPoint);
145        }
146    }
147
148    @Override
149    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
150        if (QUEUES_EP.equals(extensionPoint)) {
151            unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution);
152        } else if (IMPL_EP.equals(extensionPoint)) {
153            unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
154        } else {
155            throw new RuntimeException("Unknown extension point: " + extensionPoint);
156        }
157    }
158
159    void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
160        String queueId = workQueueDescriptor.id;
161        if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) {
162            Boolean processing = workQueueDescriptor.processing;
163            if (processing == null) {
164                log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES
165                        + " with no processing/queuing");
166                return;
167            }
168            String what = " processing=" + processing;
169            what += queuing == null ? "" : (" queuing=" + queuing);
170            log.info("Setting on all work queues:" + what);
171            // activate/deactivate processing/queuing on all queues
172            List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy
173            for (String id : queueIds) {
174                // add an updated contribution redefining processing/queuing
175                WorkQueueDescriptor wqd = new WorkQueueDescriptor();
176                wqd.id = id;
177                wqd.processing = processing;
178                registerWorkQueueDescriptor(wqd);
179            }
180            return;
181        }
182        workQueueConfig.addContribution(workQueueDescriptor);
183        WorkQueueDescriptor wqd = workQueueConfig.get(queueId);
184        log.info("Registered work queue " + queueId + " " + wqd.toString());
185    }
186
187    void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
188        String id = workQueueDescriptor.id;
189        if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) {
190            return;
191        }
192        workQueueConfig.removeContribution(workQueueDescriptor);
193        log.info("Unregistered work queue " + id);
194    }
195
196    void initializeQueue(WorkQueueDescriptor config) {
197        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
198            throw new IllegalArgumentException("cannot initialize all queues");
199        }
200        if (queuing.getQueue(config.id) != null) {
201            throw new IllegalStateException("work queue " + config.id + " is already initialized");
202        }
203        if (executors.containsKey(config.id)) {
204            throw new IllegalStateException("work queue " + config.id + " already have an executor");
205        }
206        NuxeoBlockingQueue queue = queuing.init(config);
207        ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-");
208        int maxPoolSize = config.getMaxThreads();
209        WorkThreadPoolExecutor executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS,
210                queue, threadFactory);
211        // prestart all core threads so that direct additions to the queue
212        // (from another Nuxeo instance) can be seen
213        executor.prestartAllCoreThreads();
214        executors.put(config.id, executor);
215        log.info("Initialized work queue " + config.id + " " + config.toEffectiveString());
216    }
217
218    void activateQueue(WorkQueueDescriptor config) {
219        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
220            throw new IllegalArgumentException("cannot activate all queues");
221        }
222        queuing.setActive(config.id, config.isProcessingEnabled());
223        log.info("Activated work queue " + config.id + " " + config.toEffectiveString());
224        // Enable metrics
225        if (config.isProcessingEnabled()) {
226            activateQueueMetrics(config.id);
227        }
228    }
229
230    void deactivateQueue(WorkQueueDescriptor config) {
231        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
232            throw new IllegalArgumentException("cannot deactivate all queues");
233        }
234        // Disable metrics
235        if (config.isProcessingEnabled()) {
236            deactivateQueueMetrics(config.id);
237        }
238        queuing.setActive(config.id, false);
239        log.info("Deactivated work queue " + config.id);
240    }
241
242    void activateQueueMetrics(String queueId) {
243        NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId);
244        queueMetrics.putGauge(() -> getMetrics(queueId).scheduled, "scheduled", "count");
245        queueMetrics.putGauge(() -> getMetrics(queueId).running, "running");
246        queueMetrics.putGauge(() -> getMetrics(queueId).completed, "completed");
247        queueMetrics.putGauge(() -> getMetrics(queueId).canceled, "canceled");
248        registry.registerAll(queueMetrics);
249    }
250
251    void deactivateQueueMetrics(String queueId) {
252        String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId);
253        registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName));
254    }
255
256    void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
257        workQueuingConfig.addContribution(descr);
258    }
259
260    void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
261        workQueuingConfig.removeContribution(descr);
262    }
263
264    protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) {
265        try {
266            return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener());
267        } catch (ReflectiveOperationException | SecurityException e) {
268            throw new RuntimeException(e);
269        }
270    }
271
272    @Override
273    public boolean isQueuingEnabled(String queueId) {
274        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
275        return wqd != null && wqd.isQueuingEnabled();
276    }
277
278    @Override
279    public void enableProcessing(boolean value) {
280        workQueueConfig.getQueueIds().forEach(id -> enableProcessing(id, value));
281    }
282
283    @Override
284    public void enableProcessing(String queueId, boolean value) {
285        WorkQueueDescriptor config = workQueueConfig.get(queueId);
286        if (config == null) {
287            throw new IllegalArgumentException("no such queue " + queueId);
288        }
289        if (!value) {
290            deactivateQueue(config);
291        } else {
292            activateQueue(config);
293        }
294    }
295
296    @Override
297    public boolean isProcessingEnabled() {
298        for (String queueId : workQueueConfig.getQueueIds()) {
299            if (queuing.getQueue(queueId).active) {
300                return true;
301            }
302        }
303        return false;
304    }
305
306    @Override
307    public boolean isProcessingEnabled(String queueId) {
308        if (queueId == null) {
309            return isProcessingEnabled();
310        }
311        return queuing.getQueue(queueId).active;
312    }
313
314    // ----- WorkManager -----
315
316    @Override
317    public List<String> getWorkQueueIds() {
318        synchronized (workQueueConfig) {
319            return workQueueConfig.getQueueIds();
320        }
321    }
322
323    @Override
324    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
325        synchronized (workQueueConfig) {
326            return workQueueConfig.get(queueId);
327        }
328    }
329
330    @Override
331    public String getCategoryQueueId(String category) {
332        if (category == null) {
333            category = DEFAULT_CATEGORY;
334        }
335        String queueId = workQueueConfig.getQueueId(category);
336        if (queueId == null) {
337            queueId = DEFAULT_QUEUE_ID;
338        }
339        return queueId;
340    }
341
342    @Override
343    public int getApplicationStartedOrder() {
344        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
345    }
346
347    @Override
348    public void start(ComponentContext context) {
349        init();
350    }
351
352    @Override
353    public void stop(ComponentContext context) throws InterruptedException {
354        // do nothing: we shutdown the thread pool at first before stopping any component
355        // see LifeCycleHandler.beforeStop
356    }
357
358    protected volatile boolean started = false;
359
360    protected volatile boolean shutdownInProgress = false;
361
362    @Override
363    public void init() {
364        if (started) {
365            return;
366        }
367        synchronized (this) {
368            if (started) {
369                return;
370            }
371            queuing = newWorkQueuing(workQueuingConfig.klass);
372            completionSynchronizer = new WorkCompletionSynchronizer();
373            started = true;
374            workQueueConfig.index();
375            for (String id : workQueueConfig.getQueueIds()) {
376                initializeQueue(workQueueConfig.get(id));
377            }
378
379            Framework.getRuntime().getComponentManager().addListener(new ComponentManager.LifeCycleHandler() {
380                @Override
381                public void beforeStop(ComponentManager mgr, boolean isStandby) {
382                    for (String id : workQueueConfig.getQueueIds()) {
383                        deactivateQueue(workQueueConfig.get(id));
384                    }
385                    try {
386                        if (!shutdown(10, TimeUnit.SECONDS)) {
387                            log.error("Some processors are still active");
388                        }
389                    } catch (InterruptedException e) {
390                        Thread.currentThread().interrupt();
391                        throw new NuxeoException("Interrupted while stopping work manager thread pools", e);
392                    }
393                }
394
395                @Override
396                public void afterStart(ComponentManager mgr, boolean isResume) {
397                    for (String id : workQueueConfig.getQueueIds()) {
398                        activateQueue(workQueueConfig.get(id));
399                    }
400                }
401
402                @Override
403                public void afterStop(ComponentManager mgr, boolean isStandby) {
404                    Framework.getRuntime().getComponentManager().removeListener(this);
405                }
406            });
407        }
408    }
409
410    protected WorkThreadPoolExecutor getExecutor(String queueId) {
411        if (!started) {
412            if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) {
413                LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode");
414                init();
415            } else {
416                throw new IllegalStateException("Work manager not started, could not access to executors");
417            }
418        }
419        WorkQueueDescriptor workQueueDescriptor;
420        synchronized (workQueueConfig) {
421            workQueueDescriptor = workQueueConfig.get(queueId);
422        }
423        if (workQueueDescriptor == null) {
424            throw new IllegalArgumentException("No such work queue: " + queueId);
425        }
426
427        return executors.get(queueId);
428    }
429
430    @Override
431    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
432        WorkThreadPoolExecutor executor = getExecutor(queueId);
433        return shutdownExecutors(Collections.singleton(executor), timeout, unit);
434    }
435
436    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
437            throws InterruptedException {
438        // mark executors as shutting down
439        for (WorkThreadPoolExecutor executor : list) {
440            executor.shutdownAndSuspend();
441        }
442        timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
443        // wait until threads termination
444        for (WorkThreadPoolExecutor executor : list) {
445            long t0 = System.currentTimeMillis();
446            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
447                return false;
448            }
449            timeout -= unit.convert(System.currentTimeMillis() - t0, TimeUnit.MILLISECONDS);
450        }
451        return true;
452    }
453
454    protected long remainingMillis(long t0, long delay) {
455        long d = System.currentTimeMillis() - t0;
456        if (d > delay) {
457            return 0;
458        }
459        return delay - d;
460    }
461
462    protected synchronized void removeExecutor(String queueId) {
463        executors.remove(queueId);
464    }
465
466    @Override
467    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
468        shutdownInProgress = true;
469        try {
470            return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit);
471        } finally {
472            shutdownInProgress = false;
473            started = false;
474        }
475    }
476
477    /**
478     * A work instance and how to schedule it, for schedule-after-commit.
479     *
480     * @since 5.8
481     */
482    public class WorkScheduling implements Synchronization {
483        public final Work work;
484
485        public final Scheduling scheduling;
486
487        public WorkScheduling(Work work, Scheduling scheduling) {
488            this.work = work;
489            this.scheduling = scheduling;
490        }
491
492        @Override
493        public void beforeCompletion() {
494        }
495
496        @Override
497        public void afterCompletion(int status) {
498            if (status == Status.STATUS_COMMITTED) {
499                schedule(work, scheduling, false);
500            } else if (status == Status.STATUS_ROLLEDBACK) {
501                work.setWorkInstanceState(State.UNKNOWN);
502            } else {
503                throw new IllegalArgumentException("Unsupported transaction status " + status);
504            }
505        }
506    }
507
508    /**
509     * Creates non-daemon threads at normal priority.
510     */
511    private static class NamedThreadFactory implements ThreadFactory {
512
513        private final AtomicInteger threadNumber = new AtomicInteger();
514
515        private final ThreadGroup group;
516
517        private final String prefix;
518
519        public NamedThreadFactory(String prefix) {
520            SecurityManager sm = System.getSecurityManager();
521            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
522            this.prefix = prefix;
523        }
524
525        @Override
526        public Thread newThread(Runnable r) {
527            String name = prefix + threadNumber.incrementAndGet();
528            Thread thread = new Thread(group, r, name);
529            // do not set daemon
530            thread.setPriority(Thread.NORM_PRIORITY);
531            thread.setUncaughtExceptionHandler((t,
532                    e) -> LogFactory.getLog(WorkManagerImpl.class).error("Uncaught error on thread " + t.getName(), e));
533            return thread;
534        }
535    }
536
537    /**
538     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
539     * <p>
540     * Completed tasks are passed to another queue.
541     * <p>
542     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
543     * persisted, etc).
544     *
545     * @since 5.6
546     */
547    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
548
549        protected final String queueId;
550
551        /**
552         * List of running Work instances, in order to be able to interrupt them if requested.
553         */
554        // @GuardedBy("itself")
555        protected final ConcurrentLinkedQueue<Work> running;
556
557        // metrics, in cluster mode these counters must be aggregated, no logic should rely on them
558        // Number of work scheduled by this instance
559        protected final Counter scheduledCount;
560
561        // Number of work currently running on this instance
562        protected final Counter runningCount;
563
564        // Number of work completed by this instance
565        protected final Counter completedCount;
566
567        protected final Timer workTimer;
568
569        protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
570                NuxeoBlockingQueue queue, ThreadFactory threadFactory) {
571            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory);
572            queueId = queue.queueId;
573            running = new ConcurrentLinkedQueue<>();
574            // init metrics
575            scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count"));
576            runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running"));
577            completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed"));
578            workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total"));
579        }
580
581        public int getScheduledOrRunningSize() {
582            int ret = 0;
583            for (String queueId : getWorkQueueIds()) {
584                ret += getQueueSize(queueId, null);
585            }
586            return ret;
587        }
588
589        @Override
590        public void execute(Runnable r) {
591            throw new UnsupportedOperationException("use other api");
592        }
593
594        /**
595         * Executes the given task sometime in the future.
596         *
597         * @param work the work to execute
598         * @see #execute(Runnable)
599         */
600        public void execute(Work work) {
601            scheduledCount.inc();
602            submit(work);
603        }
604
605        /**
606         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
607         * directly
608         */
609        protected void submit(Work work) throws RuntimeException {
610            queuing.workSchedule(queueId, work);
611        }
612
613        @Override
614        protected void beforeExecute(Thread t, Runnable r) {
615            Work work = WorkHolder.getWork(r);
616            if (isShutdown()) {
617                work.setWorkInstanceState(State.SCHEDULED);
618                queuing.workReschedule(queueId, work);
619                throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work);
620            }
621            work.setWorkInstanceState(State.RUNNING);
622            queuing.workRunning(queueId, work);
623            running.add(work);
624            runningCount.inc();
625        }
626
627        @Override
628        protected void afterExecute(Runnable r, Throwable t) {
629            Work work = WorkHolder.getWork(r);
630            try {
631                if (work.isSuspending()) {
632                    log.trace(work + " is suspending, giving up");
633                    return;
634                }
635                if (isShutdown()) {
636                    log.trace("rescheduling " + work.getId(), t);
637                    work.setWorkInstanceState(State.SCHEDULED);
638                    queuing.workReschedule(queueId, work);
639                    return;
640                }
641                work.setWorkInstanceState(State.UNKNOWN);
642                queuing.workCompleted(queueId, work);
643            } finally {
644                running.remove(work);
645                runningCount.dec();
646                completedCount.inc();
647                workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
648                completionSynchronizer.signalCompletedWork();
649            }
650        }
651
652        /**
653         * Initiates a shutdown of this executor and asks for work instances to suspend themselves.
654         *
655         * @throws InterruptedException
656         */
657        public void shutdownAndSuspend() throws InterruptedException {
658            try {
659                // don't consume the queue anymore
660                deactivateQueueMetrics(queueId);
661                queuing.setActive(queueId, false);
662                // suspend all running work
663                for (Work work : running) {
664                    work.setWorkInstanceSuspending();
665                    log.trace("suspending and rescheduling " + work.getId());
666                    work.setWorkInstanceState(State.SCHEDULED);
667                    queuing.workReschedule(queueId, work);
668                }
669                shutdownNow();
670            } finally {
671                executors.remove(queueId);
672            }
673        }
674
675        public void removeScheduled(String workId) {
676            queuing.removeScheduled(queueId, workId);
677        }
678
679    }
680
681    @Override
682    public void schedule(Work work) {
683        schedule(work, Scheduling.ENQUEUE, false);
684    }
685
686    @Override
687    public void schedule(Work work, boolean afterCommit) {
688        schedule(work, Scheduling.ENQUEUE, afterCommit);
689    }
690
691    @Override
692    public void schedule(Work work, Scheduling scheduling) {
693        schedule(work, scheduling, false);
694    }
695
696    @Override
697    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
698        String workId = work.getId();
699        String queueId = getCategoryQueueId(work.getCategory());
700        if (!isQueuingEnabled(queueId)) {
701            return;
702        }
703        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
704            return;
705        }
706        work.setWorkInstanceState(State.SCHEDULED);
707        WorkSchedulePath.newInstance(work);
708        switch (scheduling) {
709        case ENQUEUE:
710            break;
711        case CANCEL_SCHEDULED:
712            getExecutor(queueId).removeScheduled(workId);
713            break;
714        case IF_NOT_SCHEDULED:
715        case IF_NOT_RUNNING_OR_SCHEDULED:
716            // TODO disabled for now because hasWorkInState uses isScheduled
717            // which is buggy
718            boolean disabled = Boolean.TRUE.booleanValue();
719            if (!disabled && hasWorkInState(workId, scheduling.state)) {
720                if (log.isDebugEnabled()) {
721                    log.debug("Canceling schedule because found: " + scheduling);
722                }
723                return;
724
725            }
726            break;
727
728        }
729        queuing.workSchedule(queueId, work);
730    }
731
732    /**
733     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
734     *
735     * @since 5.8
736     */
737    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
738        TransactionManager transactionManager;
739        try {
740            transactionManager = TransactionHelper.lookupTransactionManager();
741        } catch (NamingException e) {
742            transactionManager = null;
743        }
744        if (transactionManager == null) {
745            if (log.isDebugEnabled()) {
746                log.debug("Not scheduling work after commit because of missing transaction manager: " + work);
747            }
748            return false;
749        }
750        try {
751            Transaction transaction = transactionManager.getTransaction();
752            if (transaction == null) {
753                if (log.isDebugEnabled()) {
754                    log.debug("Not scheduling work after commit because of missing transaction: " + work);
755                }
756                return false;
757            }
758            int status = transaction.getStatus();
759            if (status == Status.STATUS_ACTIVE) {
760                if (log.isDebugEnabled()) {
761                    log.debug("Scheduling work after commit: " + work);
762                }
763                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
764                return true;
765            } else if (status == Status.STATUS_COMMITTED) {
766                // called in afterCompletion, we can schedule immediately
767                if (log.isDebugEnabled()) {
768                    log.debug("Scheduling work immediately: " + work);
769                }
770                return false;
771            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
772                if (log.isDebugEnabled()) {
773                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work);
774                }
775                return true;
776            } else {
777                if (log.isDebugEnabled()) {
778                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
779                            + work);
780                }
781                return false;
782            }
783        } catch (SystemException | RollbackException e) {
784            log.error("Cannot schedule after commit", e);
785            return false;
786        }
787    }
788
789    @Override
790    public Work find(String workId, State state) {
791        return queuing.find(workId, state);
792    }
793
794    /**
795     * @param state SCHEDULED, RUNNING or null for both
796     */
797    protected boolean hasWorkInState(String workId, State state) {
798        return queuing.isWorkInState(workId, state);
799    }
800
801    @Override
802    public State getWorkState(String workId) {
803        return queuing.getWorkState(workId);
804    }
805
806    @Override
807    public List<Work> listWork(String queueId, State state) {
808        // don't return scheduled after commit
809        return queuing.listWork(queueId, state);
810    }
811
812    @Override
813    public List<String> listWorkIds(String queueId, State state) {
814        return queuing.listWorkIds(queueId, state);
815    }
816
817    @Override
818    public WorkQueueMetrics getMetrics(String queueId) {
819        return queuing.metrics(queueId);
820    }
821
822    @Override
823    public int getQueueSize(String queueId, State state) {
824        WorkQueueMetrics metrics = getMetrics(queueId);
825        if (state == null) {
826            return metrics.scheduled.intValue() + metrics.running.intValue();
827        }
828        if (state == State.SCHEDULED) {
829            return metrics.scheduled.intValue();
830        } else if (state == State.RUNNING) {
831            return metrics.running.intValue();
832        } else {
833            throw new IllegalArgumentException(String.valueOf(state));
834        }
835    }
836
837    @Override
838    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
839        return awaitCompletion(null, duration, unit);
840    }
841
842    @Override
843    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
844        if (!isStarted()) {
845            return true;
846        }
847        SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId));
848        long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit);
849        long deadline = getTimestampAfter(durationInMs);
850        int pause = (int) Math.min(durationInMs, 500L);
851        log.debug("awaitForCompletion " + durationInMs + " ms");
852        do {
853            if (noScheduledOrRunningWork(queueId)) {
854                completionSynchronizer.signalCompletedWork();
855                SequenceTracer.stop("done");
856                return true;
857            }
858            completionSynchronizer.waitForCompletedWork(pause);
859        } while (System.currentTimeMillis() < deadline);
860        log.info("awaitCompletion timeout after " + durationInMs + " ms");
861        SequenceTracer.destroy("timeout after " + durationInMs + " ms");
862        return false;
863    }
864
865    protected long getTimestampAfter(long durationInMs) {
866        long ret = System.currentTimeMillis() + durationInMs;
867        if (ret < 0) {
868            ret = Long.MAX_VALUE;
869        }
870        return ret;
871    }
872
873    protected boolean noScheduledOrRunningWork(String queueId) {
874        if (queueId == null) {
875            for (String id : getWorkQueueIds()) {
876                if (!noScheduledOrRunningWork(id)) {
877                    return false;
878                }
879            }
880            return true;
881        }
882        if (!isProcessingEnabled(queueId)) {
883            return getExecutor(queueId).runningCount.getCount() == 0L;
884        }
885        if (getQueueSize(queueId, null) > 0) {
886            if (log.isTraceEnabled()) {
887                log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) + ", running: "
888                        + getQueueSize(queueId, State.RUNNING));
889            }
890            return false;
891        }
892        if (log.isTraceEnabled()) {
893            log.trace(queueId + " is completed");
894        }
895        return true;
896    }
897
898    @Override
899    public boolean isStarted() {
900        return started && !shutdownInProgress;
901    }
902
903}