001/*
002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 *     Benoit Delbosc
017 */
018package org.nuxeo.ecm.core.work;
019
020import java.lang.reflect.Constructor;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.BlockingQueue;
029import java.util.concurrent.RejectedExecutionHandler;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036
037import javax.naming.NamingException;
038import javax.transaction.RollbackException;
039import javax.transaction.Status;
040import javax.transaction.Synchronization;
041import javax.transaction.SystemException;
042import javax.transaction.Transaction;
043import javax.transaction.TransactionManager;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.nuxeo.ecm.core.event.EventServiceComponent;
048import org.nuxeo.ecm.core.work.api.Work;
049import org.nuxeo.ecm.core.work.api.Work.State;
050import org.nuxeo.ecm.core.work.api.WorkManager;
051import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
052import org.nuxeo.ecm.core.work.api.WorkQueuingImplDescriptor;
053import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
054import org.nuxeo.runtime.RuntimeServiceEvent;
055import org.nuxeo.runtime.RuntimeServiceListener;
056import org.nuxeo.runtime.api.Framework;
057import org.nuxeo.runtime.metrics.MetricsService;
058import org.nuxeo.runtime.model.ComponentContext;
059import org.nuxeo.runtime.model.ComponentInstance;
060import org.nuxeo.runtime.model.DefaultComponent;
061import org.nuxeo.runtime.transaction.TransactionHelper;
062
063import com.codahale.metrics.Counter;
064import com.codahale.metrics.MetricRegistry;
065import com.codahale.metrics.SharedMetricRegistries;
066import com.codahale.metrics.Timer;
067
068/**
069 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
070 * implementation.
071 *
072 * @since 5.6
073 */
074public class WorkManagerImpl extends DefaultComponent implements WorkManager {
075
076    public static final String NAME = "org.nuxeo.ecm.core.work.service";
077
078    private static final Log log = LogFactory.getLog(WorkManagerImpl.class);
079
080    protected static final String QUEUES_EP = "queues";
081
082    protected static final String IMPL_EP = "implementation";
083
084    public static final String DEFAULT_QUEUE_ID = "default";
085
086    public static final String DEFAULT_CATEGORY = "default";
087
088    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
089
090    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
091
092    // @GuardedBy("itself")
093    protected final WorkQueueDescriptorRegistry workQueueDescriptors = new WorkQueueDescriptorRegistry(this);
094
095    // used synchronized
096    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
097
098    protected final WorkCompletionSynchronizer completionSynchronizer = new WorkCompletionSynchronizer("all");
099
100    protected WorkQueuing queuing = newWorkQueuing(MemoryWorkQueuing.class);
101
102    @Override
103    public void activate(ComponentContext context) {
104        Framework.addListener(new ShutdownListener());
105    }
106
107    @Override
108    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
109        if (QUEUES_EP.equals(extensionPoint)) {
110            registerWorkQueueDescriptor((WorkQueueDescriptor) contribution);
111        } else if (IMPL_EP.equals(extensionPoint)) {
112            registerWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution);
113        } else {
114            throw new RuntimeException("Unknown extension point: " + extensionPoint);
115        }
116    }
117
118    @Override
119    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
120        if (QUEUES_EP.equals(extensionPoint)) {
121            unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution);
122        } else if (IMPL_EP.equals(extensionPoint)) {
123            unregisterWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution);
124        } else {
125            throw new RuntimeException("Unknown extension point: " + extensionPoint);
126        }
127    }
128
129    public void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
130        String queueId = workQueueDescriptor.id;
131        if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) {
132            Boolean processing = workQueueDescriptor.processing;
133            Boolean queuing = workQueueDescriptor.queuing;
134            if (processing == null && queuing == null) {
135                log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES
136                        + " with no processing/queuing");
137                return;
138            }
139            String what = processing == null ? "" : (" processing=" + processing);
140            what += queuing == null ? "" : (" queuing=" + queuing);
141            log.info("Setting on all work queues:" + what);
142            // activate/deactivate processing/queuing on all queues
143            List<String> queueIds = new ArrayList<>(workQueueDescriptors.getQueueIds()); // copy
144            for (String id : queueIds) {
145                // add an updated contribution redefining processing/queuing
146                WorkQueueDescriptor wqd = new WorkQueueDescriptor();
147                wqd.id = id;
148                wqd.processing = processing;
149                wqd.queuing = queuing;
150                registerWorkQueueDescriptor(wqd);
151            }
152            return;
153        }
154        workQueueDescriptors.addContribution(workQueueDescriptor);
155        WorkQueueDescriptor wqd = workQueueDescriptors.get(queueId);
156        log.info("Registered work queue " + queueId + " " + wqd.toString());
157    }
158
159    public void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
160        String id = workQueueDescriptor.id;
161        if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) {
162            return;
163        }
164        workQueueDescriptors.removeContribution(workQueueDescriptor);
165        log.info("Unregistered work queue " + id);
166    }
167
168    protected void activateQueue(WorkQueueDescriptor workQueueDescriptor) {
169        String id = workQueueDescriptor.id;
170        WorkThreadPoolExecutor executor = executors.get(id);
171        if (executor == null) {
172            ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + id + "-");
173            int maxPoolSize = workQueueDescriptor.getMaxThreads();
174            executor = new WorkThreadPoolExecutor(id, maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, threadFactory);
175            // prestart all core threads so that direct additions to the queue
176            // (from another Nuxeo instance) can be seen
177            executor.prestartAllCoreThreads();
178            executors.put(id, executor);
179        }
180        NuxeoBlockingQueue queue = (NuxeoBlockingQueue) executor.getQueue();
181        // get merged contrib
182        // set active state
183        queue.setActive(workQueueDescriptor.isProcessingEnabled());
184        log.info("Activated work queue " + id + " " + workQueueDescriptor.toEffectiveString());
185    }
186
187    public void deactivateQueue(WorkQueueDescriptor workQueueDescriptor) {
188        if (WorkQueueDescriptor.ALL_QUEUES.equals(workQueueDescriptor.id)) {
189            return;
190        }
191        WorkThreadPoolExecutor executor = executors.get(workQueueDescriptor.id);
192        executor.shutdownAndSuspend();
193        log.info("Deactivated work queue " + workQueueDescriptor.id);
194    }
195
196    public void registerWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) {
197        WorkQueuing q = newWorkQueuing(descr.getWorkQueuingClass());
198        registerWorkQueuing(q);
199    }
200
201    public void registerWorkQueuing(WorkQueuing q) {
202        closeQueuing();
203        queuing = q;
204    }
205
206    public void unregisterWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) {
207        unregisterWorkQueing();
208    }
209
210    public void unregisterWorkQueing() {
211        closeQueuing();
212        queuing = newWorkQueuing(MemoryWorkQueuing.class);
213    }
214
215    protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) {
216        WorkQueuing q;
217        try {
218            Constructor<? extends WorkQueuing> ctor = klass.getConstructor(WorkManagerImpl.class,
219                    WorkQueueDescriptorRegistry.class);
220            q = ctor.newInstance(this, workQueueDescriptors);
221        } catch (ReflectiveOperationException | SecurityException e) {
222            throw new RuntimeException(e);
223        }
224        return q;
225    }
226
227    protected void closeQueuing() {
228        try {
229            shutdown(5, TimeUnit.SECONDS);
230        } catch (InterruptedException e) {
231            Thread.currentThread().interrupt(); // restore interrupted status
232            throw new RuntimeException(e);
233        }
234    }
235
236    protected boolean isQueuingEnabled(String queueId) {
237        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
238        return wqd == null ? false : wqd.isQueuingEnabled();
239    }
240
241    protected boolean isProcessingEnabled(String queueId) {
242        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
243        return wqd == null ? false : wqd.isProcessingEnabled();
244    }
245
246    // ----- WorkManager -----
247
248    @Override
249    public List<String> getWorkQueueIds() {
250        synchronized (workQueueDescriptors) {
251            return workQueueDescriptors.getQueueIds();
252        }
253    }
254
255    @Override
256    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
257        synchronized (workQueueDescriptors) {
258            return workQueueDescriptors.get(queueId);
259        }
260    }
261
262    @Override
263    public String getCategoryQueueId(String category) {
264        if (category == null) {
265            category = DEFAULT_CATEGORY;
266        }
267        String queueId = workQueueDescriptors.getQueueId(category);
268        if (queueId == null) {
269            queueId = DEFAULT_QUEUE_ID;
270        }
271        return queueId;
272    }
273
274    @Override
275    public int getApplicationStartedOrder() {
276        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
277    }
278
279    @Override
280    public void applicationStarted(ComponentContext context) {
281        init();
282    }
283
284    protected volatile boolean started = false;
285
286    @Override
287    public void init() {
288        if (started) {
289            return;
290        }
291        synchronized (this) {
292            if (started) {
293                return;
294            }
295            started = true;
296            queuing.init();
297            for (String id : workQueueDescriptors.getQueueIds()) {
298                activateQueue(workQueueDescriptors.get(id));
299            }
300        }
301    }
302
303    protected WorkThreadPoolExecutor getExecutor(String queueId) {
304        if (!started) {
305            if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) {
306                LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode");
307                init();
308            } else {
309                throw new IllegalStateException("Work manager not started, could not access to executors");
310            }
311        }
312        WorkQueueDescriptor workQueueDescriptor;
313        synchronized (workQueueDescriptors) {
314            workQueueDescriptor = workQueueDescriptors.get(queueId);
315        }
316        if (workQueueDescriptor == null) {
317            throw new IllegalArgumentException("No such work queue: " + queueId);
318        }
319
320        return executors.get(queueId);
321    }
322
323    @Override
324    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
325        WorkThreadPoolExecutor executor = getExecutor(queueId);
326        boolean terminated = shutdownExecutors(Collections.singleton(executor), timeout, unit);
327        removeExecutor(queueId); // start afresh
328        return terminated;
329    }
330
331    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
332            throws InterruptedException {
333        // mark executors as shutting down
334        for (WorkThreadPoolExecutor executor : list) {
335            executor.shutdownAndSuspend();
336        }
337
338        long t0 = System.currentTimeMillis();
339        long delay = unit.toMillis(timeout);
340
341        // wait for termination or suspension
342        boolean terminated = true;
343        for (WorkThreadPoolExecutor executor : list) {
344            long remaining = remainingMillis(t0, delay);
345            if (!executor.awaitTerminationOrSave(remaining, TimeUnit.MILLISECONDS)) {
346                terminated = false;
347                // hard shutdown for remaining tasks
348                executor.shutdownNow();
349            }
350        }
351
352        return terminated;
353    }
354
355    protected long remainingMillis(long t0, long delay) {
356        long d = System.currentTimeMillis() - t0;
357        if (d > delay) {
358            return 0;
359        }
360        return delay - d;
361    }
362
363    protected synchronized void removeExecutor(String queueId) {
364        executors.remove(queueId);
365    }
366
367    @Override
368    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
369        List<WorkThreadPoolExecutor> executorList = new ArrayList<>(executors.values());
370        executors.clear();
371        started = false;
372        return shutdownExecutors(executorList, timeout, unit);
373    }
374
375    protected class ShutdownListener implements RuntimeServiceListener {
376        @Override
377        public void handleEvent(RuntimeServiceEvent event) {
378            if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) {
379                return;
380            }
381            Framework.removeListener(this);
382            closeQueuing();
383        }
384    }
385
386    /**
387     * A work instance and how to schedule it, for schedule-after-commit.
388     *
389     * @since 5.8
390     */
391    public class WorkScheduling implements Synchronization {
392        public final Work work;
393
394        public final Scheduling scheduling;
395
396        public WorkScheduling(Work work, Scheduling scheduling) {
397            this.work = work;
398            this.scheduling = scheduling;
399        }
400
401        @Override
402        public void beforeCompletion() {
403        }
404
405        @Override
406        public void afterCompletion(int status) {
407            if (status == Status.STATUS_COMMITTED) {
408                schedule(work, scheduling, false);
409            } else if (status == Status.STATUS_ROLLEDBACK) {
410                work.setWorkInstanceState(State.CANCELED);
411            } else {
412                throw new IllegalArgumentException("Unsupported transaction status " + status);
413            }
414        }
415    }
416
417    /**
418     * Creates non-daemon threads at normal priority.
419     */
420    private static class NamedThreadFactory implements ThreadFactory {
421
422        private final AtomicInteger threadNumber = new AtomicInteger();
423
424        private final ThreadGroup group;
425
426        private final String prefix;
427
428        public NamedThreadFactory(String prefix) {
429            SecurityManager sm = System.getSecurityManager();
430            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
431            this.prefix = prefix;
432        }
433
434        @Override
435        public Thread newThread(Runnable r) {
436            String name = prefix + threadNumber.incrementAndGet();
437            Thread thread = new Thread(group, r, name);
438            // do not set daemon
439            thread.setPriority(Thread.NORM_PRIORITY);
440            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
441
442                @Override
443                public void uncaughtException(Thread t, Throwable e) {
444                    LogFactory.getLog(WorkManagerImpl.class).error("Uncaught error on thread " + t.getName(), e);
445                }
446            });
447            return thread;
448        }
449    }
450
451    /**
452     * A handler for rejected tasks that discards them.
453     */
454    public static class CancelingPolicy implements RejectedExecutionHandler {
455
456        public static final CancelingPolicy INSTANCE = new CancelingPolicy();
457
458        @Override
459        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
460            ((WorkThreadPoolExecutor) executor).removedFromQueue(r);
461        }
462    }
463
464    public class WorkCompletionSynchronizer {
465
466        protected final AtomicInteger scheduledOrRunning = new AtomicInteger(0);
467
468        protected final ReentrantLock completionLock = new ReentrantLock();
469
470        protected final Condition completion = completionLock.newCondition();
471
472        @SuppressWarnings("hiding")
473        protected final Log log = LogFactory.getLog(WorkCompletionSynchronizer.class);
474
475        protected final String queueid;
476
477        protected WorkCompletionSynchronizer(String id) {
478            queueid = id;
479        }
480
481        protected long await(long timeout) throws InterruptedException {
482            completionLock.lock();
483            try {
484                while (timeout > 0 && scheduledOrRunning.get() > 0) {
485                    timeout = completion.awaitNanos(timeout);
486                }
487            } finally {
488                if (log.isTraceEnabled()) {
489                    log.trace("returning from await");
490                }
491                completionLock.unlock();
492            }
493            return timeout;
494        }
495
496        protected void signalSchedule() {
497            int value = scheduledOrRunning.incrementAndGet();
498            if (log.isTraceEnabled()) {
499                logScheduleAndRunning("scheduled", value);
500            }
501            if (completionSynchronizer != this) {
502                completionSynchronizer.signalSchedule();
503            }
504        }
505
506        protected void signalCompletion() {
507            final int value = scheduledOrRunning.decrementAndGet();
508            if (value == 0) {
509                completionLock.lock();
510                try {
511                    completion.signalAll();
512                } finally {
513                    completionLock.unlock();
514                }
515            }
516            if (log.isTraceEnabled()) {
517                logScheduleAndRunning("completed", value);
518            }
519            if (completionSynchronizer != this) {
520                completionSynchronizer.signalCompletion();
521            }
522        }
523
524        protected void logScheduleAndRunning(String event, int value) {
525            log.trace(event + " [" + queueid + "," + value + "]", new Throwable("stack trace"));
526        }
527
528    }
529
530    /**
531     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
532     * <p>
533     * Completed tasks are passed to another queue.
534     * <p>
535     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
536     * persisted, etc).
537     *
538     * @since 5.6
539     */
540    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
541
542        protected final String queueId;
543
544        protected final WorkCompletionSynchronizer completionSynchronizer;
545
546        /**
547         * List of running Work instances, in order to be able to interrupt them if requested.
548         */
549        // @GuardedBy("itself")
550        protected final List<Work> running;
551
552        // metrics
553
554        protected final Counter scheduledCount;
555
556        protected final Counter scheduledMax;
557
558        protected final Counter runningCount;
559
560        protected final Counter completedCount;
561
562        protected final Timer workTimer;
563
564        protected WorkThreadPoolExecutor(String queueId, int corePoolSize, int maximumPoolSize, long keepAliveTime,
565                TimeUnit unit, ThreadFactory threadFactory) {
566            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queuing.initScheduleQueue(queueId), threadFactory);
567            this.queueId = queueId;
568            completionSynchronizer = new WorkCompletionSynchronizer(queueId);
569            running = new LinkedList<Work>();
570            // init metrics
571            scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count"));
572            scheduledMax = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "max"));
573            runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running"));
574            completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed"));
575            workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total"));
576        }
577
578        public int getScheduledOrRunningSize() {
579            return completionSynchronizer.scheduledOrRunning.get();
580        }
581
582        @Override
583        public void execute(Runnable r) {
584            throw new UnsupportedOperationException("use other api");
585        }
586
587        /**
588         * Executes the given task sometime in the future.
589         *
590         * @param work the work to execute
591         * @see #execute(Runnable)
592         */
593        public void execute(Work work) {
594            scheduledCount.inc();
595            if (scheduledCount.getCount() > scheduledMax.getCount()) {
596                scheduledMax.inc();
597            }
598            completionSynchronizer.signalSchedule();
599            boolean ok = false;
600            try {
601                submit(work);
602                ok = true;
603            } finally {
604                if (!ok) {
605                    completionSynchronizer.signalCompletion();
606                }
607            }
608        }
609
610        /**
611         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
612         * directly
613         *
614         * @param work
615         * @throws RuntimeException
616         */
617        protected void submit(Work work) throws RuntimeException {
618            BlockingQueue<Runnable> queue = queuing.getScheduledQueue(queueId);
619            boolean added = queue.offer(new WorkHolder(work));
620            if (!added) {
621                throw new RuntimeException("queue should have blocked");
622            }
623            // DO NOT super.execute(new WorkHolder(work));
624        }
625
626        @Override
627        protected void beforeExecute(Thread t, Runnable r) {
628            Work work = WorkHolder.getWork(r);
629            work.setWorkInstanceState(State.RUNNING);
630            queuing.workRunning(queueId, work);
631            synchronized (running) {
632                running.add(work);
633            }
634            // metrics
635            scheduledCount.dec();
636            runningCount.inc();
637        }
638
639        @Override
640        protected void afterExecute(Runnable r, Throwable t) {
641            try {
642                Work work = WorkHolder.getWork(r);
643                synchronized (running) {
644                    running.remove(work);
645                }
646                State state;
647                if (t == null) {
648                    if (work.isWorkInstanceSuspended()) {
649                        state = State.SCHEDULED;
650                    } else {
651                        state = State.COMPLETED;
652                    }
653                } else {
654                    state = State.FAILED;
655                }
656                work.setWorkInstanceState(state);
657                queuing.workCompleted(queueId, work);
658                // metrics
659                runningCount.dec();
660                completedCount.inc();
661                workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
662            } finally {
663                completionSynchronizer.signalCompletion();
664            }
665        }
666
667        // called during shutdown
668        // with tasks from the queue if new tasks are submitted
669        // or with tasks drained from the queue
670        protected void removedFromQueue(Runnable r) {
671            Work work = WorkHolder.getWork(r);
672            work.setWorkInstanceState(State.CANCELED);
673            completionSynchronizer.signalCompletion();
674        }
675
676        /**
677         * Initiates a shutdown of this executor and asks for work instances to suspend themselves. The scheduled work
678         * instances are drained and suspended.
679         */
680        public void shutdownAndSuspend() {
681            // rejected tasks will be discarded
682            setRejectedExecutionHandler(CancelingPolicy.INSTANCE);
683            // shutdown the executor
684            // if a new task is scheduled it will be rejected -> discarded
685            shutdown();
686            // request all scheduled work instances to suspend (cancel)
687            int n = queuing.setSuspending(queueId);
688            completionSynchronizer.scheduledOrRunning.addAndGet(-n);
689            // request all running work instances to suspend (stop)
690            synchronized (running) {
691                for (Work work : running) {
692                    work.setWorkInstanceSuspending();
693                }
694            }
695        }
696
697        /**
698         * Blocks until all work instances have completed after a shutdown and suspend request.
699         *
700         * @param timeout the time to wait
701         * @param unit the timeout unit
702         * @return true if all work stopped or was saved, false if some remaining after timeout
703         */
704        public boolean awaitTerminationOrSave(long timeout, TimeUnit unit) throws InterruptedException {
705            boolean terminated = super.awaitTermination(timeout, unit);
706            if (!terminated) {
707                // drain queue from remaining scheduled work
708                List<Runnable> drained = new ArrayList<>();
709                getQueue().drainTo(drained);
710                for (Runnable r : drained) {
711                    removedFromQueue(r);
712                }
713            }
714            // some work still remaining after timeout
715            return terminated;
716        }
717
718        public Work removeScheduled(String workId) {
719            Work w = queuing.removeScheduled(queueId, workId);
720            if (w != null) {
721                completionSynchronizer.signalCompletion();
722            }
723            return w;
724        }
725
726    }
727
728    @Override
729    public void schedule(Work work) {
730        schedule(work, Scheduling.ENQUEUE, false);
731    }
732
733    @Override
734    public void schedule(Work work, boolean afterCommit) {
735        schedule(work, Scheduling.ENQUEUE, afterCommit);
736    }
737
738    @Override
739    public void schedule(Work work, Scheduling scheduling) {
740        schedule(work, scheduling, false);
741    }
742
743    @Override
744    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
745        String workId = work.getId();
746        String queueId = getCategoryQueueId(work.getCategory());
747        if (!isQueuingEnabled(queueId)) {
748            work.setWorkInstanceState(State.CANCELED);
749            return;
750        }
751        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
752            return;
753        }
754        work.setWorkInstanceState(State.SCHEDULED);
755        WorkSchedulePath.newInstance(work);
756        if (log.isTraceEnabled()) {
757            log.trace("Scheduling work: " + work + " using queue: " + queueId, work.getSchedulePath().getStack());
758        } else if (log.isDebugEnabled()) {
759            log.debug("Scheduling work: " + work + " using queue: " + queueId);
760        }
761        switch (scheduling) {
762        case ENQUEUE:
763            break;
764        case CANCEL_SCHEDULED:
765            Work w = getExecutor(queueId).removeScheduled(workId);
766            if (w != null) {
767                w.setWorkInstanceState(State.CANCELED);
768                if (log.isDebugEnabled()) {
769                    log.debug("Canceling existing scheduled work before scheduling ("
770                            + completionSynchronizer.scheduledOrRunning.get() + ")");
771                }
772            }
773            break;
774        case IF_NOT_SCHEDULED:
775        case IF_NOT_RUNNING:
776        case IF_NOT_RUNNING_OR_SCHEDULED:
777            // TODO disabled for now because hasWorkInState uses isScheduled
778            // which is buggy
779            boolean disabled = Boolean.TRUE.booleanValue();
780            if (!disabled && hasWorkInState(workId, scheduling.state)) {
781                // mark passed work as canceled
782                work.setWorkInstanceState(State.CANCELED);
783                if (log.isDebugEnabled()) {
784                    log.debug("Canceling schedule because found: " + scheduling);
785                }
786                return;
787
788            }
789            break;
790
791        }
792        getExecutor(queueId).execute(work);
793    }
794
795    /**
796     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
797     *
798     * @since 5.8
799     */
800    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
801        TransactionManager transactionManager;
802        try {
803            transactionManager = TransactionHelper.lookupTransactionManager();
804        } catch (NamingException e) {
805            transactionManager = null;
806        }
807        if (transactionManager == null) {
808            if (log.isDebugEnabled()) {
809                log.debug("Not scheduling work after commit because of missing transaction manager: " + work);
810            }
811            return false;
812        }
813        try {
814            Transaction transaction = transactionManager.getTransaction();
815            if (transaction == null) {
816                if (log.isDebugEnabled()) {
817                    log.debug("Not scheduling work after commit because of missing transaction: " + work);
818                }
819                return false;
820            }
821            int status = transaction.getStatus();
822            if (status == Status.STATUS_ACTIVE) {
823                if (log.isDebugEnabled()) {
824                    log.debug("Scheduling work after commit: " + work);
825                }
826                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
827                return true;
828            } else {
829                if (log.isDebugEnabled()) {
830                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
831                            + work);
832                }
833                return false;
834            }
835        } catch (SystemException | RollbackException e) {
836            log.error("Cannot schedule after commit", e);
837            return false;
838        }
839    }
840
841    @Override
842    @Deprecated
843    public Work find(Work work, State state, boolean useEquals, int[] pos) {
844        if (pos != null) {
845            pos[0] = 0; // compat
846        }
847        String workId = work.getId();
848        return queuing.find(workId, state);
849    }
850
851    @Override
852    public Work find(String workId, State state) {
853        return queuing.find(workId, state);
854    }
855
856    @Override
857    public String findResult(String workId) {
858        Work work = find(workId, State.COMPLETED);
859        return work != null ? work.getWorkInstanceResult() : null;
860    }
861
862    /**
863     * @param state SCHEDULED, RUNNING or null for both
864     */
865    protected boolean hasWorkInState(String workId, State state) {
866        return queuing.isWorkInState(workId, state);
867    }
868
869    @Override
870    public State getWorkState(String workId) {
871        return queuing.getWorkState(workId);
872    }
873
874    @Override
875    public List<Work> listWork(String queueId, State state) {
876        // don't return scheduled after commit
877        return queuing.listWork(queueId, state);
878    }
879
880    @Override
881    public List<String> listWorkIds(String queueId, State state) {
882        return queuing.listWorkIds(queueId, state);
883    }
884
885    @Override
886    public int getQueueSize(String queueId, State state) {
887        if (state == null) {
888            return getScheduledOrRunningSize(queueId);
889        }
890        if (state == State.SCHEDULED) {
891            return getScheduledSize(queueId);
892        } else if (state == State.RUNNING) {
893            return getRunningSize(queueId);
894        } else if (state == State.COMPLETED) {
895            return getCompletedSize(queueId);
896        } else {
897            throw new IllegalArgumentException(String.valueOf(state));
898        }
899    }
900
901    @Override
902    @Deprecated
903    public int getNonCompletedWorkSize(String queueId) {
904        return getScheduledOrRunningSize(queueId);
905    }
906
907    protected int getScheduledSize(String queueId) {
908        return queuing.getQueueSize(queueId, State.SCHEDULED);
909    }
910
911    protected int getRunningSize(String queueId) {
912        return queuing.getQueueSize(queueId, State.RUNNING);
913    }
914
915    protected int getScheduledOrRunningSize(String queueId) {
916        // check the thread pool directly, this avoids race conditions
917        // because queuing.getRunningSize takes a bit of time to be
918        // accurate after a work is scheduled
919        return getExecutor(queueId).getScheduledOrRunningSize();
920    }
921
922    protected int getCompletedSize(String queueId) {
923        return queuing.getQueueSize(queueId, State.COMPLETED);
924    }
925
926    @Override
927    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
928        return getExecutor(queueId).completionSynchronizer.await(unit.toNanos(duration)) > 0;
929    }
930
931    @Override
932    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
933        return completionSynchronizer.await(unit.toNanos(duration)) > 0;
934    }
935
936    @Override
937    public synchronized void clearCompletedWork(String queueId) {
938        queuing.clearCompletedWork(queueId, 0);
939    }
940
941    @Override
942    public synchronized void clearCompletedWork(long completionTime) {
943        for (String queueId : queuing.getCompletedQueueIds()) {
944            queuing.clearCompletedWork(queueId, completionTime);
945        }
946    }
947
948    @Override
949    public synchronized void cleanup() {
950        log.debug("Clearing old completed work");
951        for (String queueId : queuing.getCompletedQueueIds()) {
952            WorkQueueDescriptor workQueueDescriptor = workQueueDescriptors.get(queueId);
953            if (workQueueDescriptor == null) {
954                // unknown queue
955                continue;
956            }
957            long delay = workQueueDescriptor.getClearCompletedAfterSeconds() * 1000L;
958            if (delay > 0) {
959                long completionTime = System.currentTimeMillis() - delay;
960                queuing.clearCompletedWork(queueId, completionTime);
961            }
962        }
963    }
964
965    @Override
966    public boolean isStarted() {
967        return started;
968    }
969
970}