001/*
002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import com.codahale.metrics.Counter;
023import com.codahale.metrics.MetricRegistry;
024import com.codahale.metrics.SharedMetricRegistries;
025import com.codahale.metrics.Timer;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.common.logging.SequenceTracer;
029import org.nuxeo.ecm.core.event.EventServiceComponent;
030import org.nuxeo.ecm.core.work.api.Work;
031import org.nuxeo.ecm.core.work.api.Work.State;
032import org.nuxeo.ecm.core.work.api.WorkManager;
033import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
034import org.nuxeo.ecm.core.work.api.WorkQueuingImplDescriptor;
035import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
036import org.nuxeo.runtime.RuntimeServiceEvent;
037import org.nuxeo.runtime.RuntimeServiceListener;
038import org.nuxeo.runtime.api.Framework;
039import org.nuxeo.runtime.metrics.MetricsService;
040import org.nuxeo.runtime.model.ComponentContext;
041import org.nuxeo.runtime.model.ComponentInstance;
042import org.nuxeo.runtime.model.DefaultComponent;
043import org.nuxeo.runtime.transaction.TransactionHelper;
044
045import java.lang.reflect.Constructor;
046import java.util.ArrayList;
047import java.util.Collection;
048import java.util.Collections;
049import java.util.HashMap;
050import java.util.LinkedList;
051import java.util.List;
052import java.util.Map;
053import java.util.concurrent.BlockingQueue;
054import java.util.concurrent.RejectedExecutionHandler;
055import java.util.concurrent.ThreadFactory;
056import java.util.concurrent.ThreadPoolExecutor;
057import java.util.concurrent.TimeUnit;
058import java.util.concurrent.atomic.AtomicInteger;
059import java.util.concurrent.locks.Condition;
060import java.util.concurrent.locks.ReentrantLock;
061
062import javax.naming.NamingException;
063import javax.transaction.RollbackException;
064import javax.transaction.Status;
065import javax.transaction.Synchronization;
066import javax.transaction.SystemException;
067import javax.transaction.Transaction;
068import javax.transaction.TransactionManager;
069
070/**
071 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
072 * implementation.
073 *
074 * @since 5.6
075 */
076public class WorkManagerImpl extends DefaultComponent implements WorkManager {
077
078    public static final String NAME = "org.nuxeo.ecm.core.work.service";
079
080    private static final Log log = LogFactory.getLog(WorkManagerImpl.class);
081
082    protected static final String QUEUES_EP = "queues";
083
084    protected static final String IMPL_EP = "implementation";
085
086    public static final String DEFAULT_QUEUE_ID = "default";
087
088    public static final String DEFAULT_CATEGORY = "default";
089
090    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
091
092    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
093
094    // @GuardedBy("itself")
095    protected final WorkQueueDescriptorRegistry workQueueDescriptors = new WorkQueueDescriptorRegistry(this);
096
097    // used synchronized
098    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
099
100    protected WorkQueuing queuing = newWorkQueuing(MemoryWorkQueuing.class);
101
102    /**
103     * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from
104     * another node in cluster mode.
105     */
106    protected class WorkCompletionSynchronizer {
107        final protected ReentrantLock lock = new ReentrantLock();
108        final protected Condition condition = lock.newCondition();
109
110        protected boolean waitForCompletedWork(long timeMs) throws InterruptedException {
111            lock.lock();
112            try {
113                return condition.await(timeMs, TimeUnit.MILLISECONDS);
114            } finally {
115                lock.unlock();
116            }
117        }
118
119        protected void signalCompletedWork() {
120            lock.lock();
121            try {
122                condition.signalAll();
123            } finally {
124                lock.unlock();
125            }
126        }
127
128    }
129
130    protected WorkCompletionSynchronizer completionSynchronizer = new WorkCompletionSynchronizer();
131
132    @Override
133    public void activate(ComponentContext context) {
134        Framework.addListener(new ShutdownListener());
135    }
136
137    @Override
138    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
139        if (QUEUES_EP.equals(extensionPoint)) {
140            registerWorkQueueDescriptor((WorkQueueDescriptor) contribution);
141        } else if (IMPL_EP.equals(extensionPoint)) {
142            registerWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution);
143        } else {
144            throw new RuntimeException("Unknown extension point: " + extensionPoint);
145        }
146    }
147
148    @Override
149    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
150        if (QUEUES_EP.equals(extensionPoint)) {
151            unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution);
152        } else if (IMPL_EP.equals(extensionPoint)) {
153            unregisterWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution);
154        } else {
155            throw new RuntimeException("Unknown extension point: " + extensionPoint);
156        }
157    }
158
159    public void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
160        String queueId = workQueueDescriptor.id;
161        if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) {
162            Boolean processing = workQueueDescriptor.processing;
163            Boolean queuing = workQueueDescriptor.queuing;
164            if (processing == null && queuing == null) {
165                log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES
166                        + " with no processing/queuing");
167                return;
168            }
169            String what = processing == null ? "" : (" processing=" + processing);
170            what += queuing == null ? "" : (" queuing=" + queuing);
171            log.info("Setting on all work queues:" + what);
172            // activate/deactivate processing/queuing on all queues
173            List<String> queueIds = new ArrayList<>(workQueueDescriptors.getQueueIds()); // copy
174            for (String id : queueIds) {
175                // add an updated contribution redefining processing/queuing
176                WorkQueueDescriptor wqd = new WorkQueueDescriptor();
177                wqd.id = id;
178                wqd.processing = processing;
179                wqd.queuing = queuing;
180                registerWorkQueueDescriptor(wqd);
181            }
182            return;
183        }
184        workQueueDescriptors.addContribution(workQueueDescriptor);
185        WorkQueueDescriptor wqd = workQueueDescriptors.get(queueId);
186        log.info("Registered work queue " + queueId + " " + wqd.toString());
187    }
188
189    public void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
190        String id = workQueueDescriptor.id;
191        if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) {
192            return;
193        }
194        workQueueDescriptors.removeContribution(workQueueDescriptor);
195        log.info("Unregistered work queue " + id);
196    }
197
198    protected void activateQueue(WorkQueueDescriptor workQueueDescriptor) {
199        String id = workQueueDescriptor.id;
200        WorkThreadPoolExecutor executor = executors.get(id);
201        if (executor == null) {
202            ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + id + "-");
203            int maxPoolSize = workQueueDescriptor.getMaxThreads();
204            executor = new WorkThreadPoolExecutor(id, maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, threadFactory);
205            // prestart all core threads so that direct additions to the queue
206            // (from another Nuxeo instance) can be seen
207            executor.prestartAllCoreThreads();
208            executors.put(id, executor);
209        }
210        NuxeoBlockingQueue queue = (NuxeoBlockingQueue) executor.getQueue();
211        // get merged contrib
212        // set active state
213        queue.setActive(workQueueDescriptor.isProcessingEnabled());
214        log.info("Activated work queue " + id + " " + workQueueDescriptor.toEffectiveString());
215    }
216
217    public void deactivateQueue(WorkQueueDescriptor workQueueDescriptor) {
218        if (WorkQueueDescriptor.ALL_QUEUES.equals(workQueueDescriptor.id)) {
219            return;
220        }
221        WorkThreadPoolExecutor executor = executors.get(workQueueDescriptor.id);
222        executor.shutdownAndSuspend();
223        log.info("Deactivated work queue " + workQueueDescriptor.id);
224    }
225
226    public void registerWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) {
227        WorkQueuing q = newWorkQueuing(descr.getWorkQueuingClass());
228        registerWorkQueuing(q);
229    }
230
231    public void registerWorkQueuing(WorkQueuing q) {
232        closeQueuing();
233        queuing = q;
234    }
235
236    public void unregisterWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) {
237        unregisterWorkQueing();
238    }
239
240    public void unregisterWorkQueing() {
241        closeQueuing();
242        queuing = newWorkQueuing(MemoryWorkQueuing.class);
243    }
244
245    protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) {
246        WorkQueuing q;
247        try {
248            Constructor<? extends WorkQueuing> ctor = klass.getConstructor(WorkManagerImpl.class,
249                    WorkQueueDescriptorRegistry.class);
250            q = ctor.newInstance(this, workQueueDescriptors);
251        } catch (ReflectiveOperationException | SecurityException e) {
252            throw new RuntimeException(e);
253        }
254        return q;
255    }
256
257    protected void closeQueuing() {
258        try {
259            shutdown(5, TimeUnit.SECONDS);
260        } catch (InterruptedException e) {
261            Thread.currentThread().interrupt(); // restore interrupted status
262            throw new RuntimeException(e);
263        }
264    }
265
266    protected boolean isQueuingEnabled(String queueId) {
267        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
268        return wqd == null ? false : wqd.isQueuingEnabled();
269    }
270
271    protected boolean isProcessingEnabled(String queueId) {
272        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
273        return wqd == null ? false : wqd.isProcessingEnabled();
274    }
275
276    // ----- WorkManager -----
277
278    @Override
279    public List<String> getWorkQueueIds() {
280        synchronized (workQueueDescriptors) {
281            return workQueueDescriptors.getQueueIds();
282        }
283    }
284
285    @Override
286    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
287        synchronized (workQueueDescriptors) {
288            return workQueueDescriptors.get(queueId);
289        }
290    }
291
292    @Override
293    public String getCategoryQueueId(String category) {
294        if (category == null) {
295            category = DEFAULT_CATEGORY;
296        }
297        String queueId = workQueueDescriptors.getQueueId(category);
298        if (queueId == null) {
299            queueId = DEFAULT_QUEUE_ID;
300        }
301        return queueId;
302    }
303
304    @Override
305    public int getApplicationStartedOrder() {
306        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
307    }
308
309    @Override
310    public void applicationStarted(ComponentContext context) {
311        init();
312    }
313
314    protected volatile boolean started = false;
315
316    protected volatile boolean shutdownInProgress = false;
317
318    @Override
319    public void init() {
320        if (started) {
321            return;
322        }
323        synchronized (this) {
324            if (started) {
325                return;
326            }
327            started = true;
328            queuing.init();
329            for (String id : workQueueDescriptors.getQueueIds()) {
330                activateQueue(workQueueDescriptors.get(id));
331            }
332        }
333    }
334
335    protected WorkThreadPoolExecutor getExecutor(String queueId) {
336        if (!started) {
337            if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) {
338                LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode");
339                init();
340            } else {
341                throw new IllegalStateException("Work manager not started, could not access to executors");
342            }
343        }
344        WorkQueueDescriptor workQueueDescriptor;
345        synchronized (workQueueDescriptors) {
346            workQueueDescriptor = workQueueDescriptors.get(queueId);
347        }
348        if (workQueueDescriptor == null) {
349            throw new IllegalArgumentException("No such work queue: " + queueId);
350        }
351
352        return executors.get(queueId);
353    }
354
355    @Override
356    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
357        WorkThreadPoolExecutor executor = getExecutor(queueId);
358        boolean terminated = shutdownExecutors(Collections.singleton(executor), timeout, unit);
359        removeExecutor(queueId); // start afresh
360        return terminated;
361    }
362
363    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
364            throws InterruptedException {
365        // mark executors as shutting down
366        for (WorkThreadPoolExecutor executor : list) {
367            executor.shutdownAndSuspend();
368        }
369
370        long t0 = System.currentTimeMillis();
371        long delay = unit.toMillis(timeout);
372
373        // wait for termination or suspension
374        boolean terminated = true;
375        for (WorkThreadPoolExecutor executor : list) {
376            long remaining = remainingMillis(t0, delay);
377            if (!executor.awaitTerminationOrSave(remaining, TimeUnit.MILLISECONDS)) {
378                terminated = false;
379                // hard shutdown for remaining tasks
380                executor.shutdownNow();
381            }
382        }
383
384        return terminated;
385    }
386
387    protected long remainingMillis(long t0, long delay) {
388        long d = System.currentTimeMillis() - t0;
389        if (d > delay) {
390            return 0;
391        }
392        return delay - d;
393    }
394
395    protected synchronized void removeExecutor(String queueId) {
396        executors.remove(queueId);
397    }
398
399    @Override
400    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
401        shutdownInProgress = true;
402        try {
403            return shutdownExecutors(executors.values(), timeout, unit);
404        } finally {
405            shutdownInProgress = false;
406            started = false;
407            executors.clear();
408        }
409    }
410
411    protected class ShutdownListener implements RuntimeServiceListener {
412        @Override
413        public void handleEvent(RuntimeServiceEvent event) {
414            if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) {
415                return;
416            }
417            Framework.removeListener(this);
418            closeQueuing();
419        }
420    }
421
422    /**
423     * A work instance and how to schedule it, for schedule-after-commit.
424     *
425     * @since 5.8
426     */
427    public class WorkScheduling implements Synchronization {
428        public final Work work;
429
430        public final Scheduling scheduling;
431
432        public WorkScheduling(Work work, Scheduling scheduling) {
433            this.work = work;
434            this.scheduling = scheduling;
435        }
436
437        @Override
438        public void beforeCompletion() {
439        }
440
441        @Override
442        public void afterCompletion(int status) {
443            if (status == Status.STATUS_COMMITTED) {
444                schedule(work, scheduling, false);
445            } else if (status == Status.STATUS_ROLLEDBACK) {
446                work.setWorkInstanceState(State.CANCELED);
447            } else {
448                throw new IllegalArgumentException("Unsupported transaction status " + status);
449            }
450        }
451    }
452
453    /**
454     * Creates non-daemon threads at normal priority.
455     */
456    private static class NamedThreadFactory implements ThreadFactory {
457
458        private final AtomicInteger threadNumber = new AtomicInteger();
459
460        private final ThreadGroup group;
461
462        private final String prefix;
463
464        public NamedThreadFactory(String prefix) {
465            SecurityManager sm = System.getSecurityManager();
466            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
467            this.prefix = prefix;
468        }
469
470        @Override
471        public Thread newThread(Runnable r) {
472            String name = prefix + threadNumber.incrementAndGet();
473            Thread thread = new Thread(group, r, name);
474            // do not set daemon
475            thread.setPriority(Thread.NORM_PRIORITY);
476            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
477
478                @Override
479                public void uncaughtException(Thread t, Throwable e) {
480                    LogFactory.getLog(WorkManagerImpl.class).error("Uncaught error on thread " + t.getName(), e);
481                }
482            });
483            return thread;
484        }
485    }
486
487    /**
488     * A handler for rejected tasks that discards them.
489     */
490    public static class CancelingPolicy implements RejectedExecutionHandler {
491
492        public static final CancelingPolicy INSTANCE = new CancelingPolicy();
493
494        @Override
495        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
496            ((WorkThreadPoolExecutor) executor).removedFromQueue(r);
497        }
498    }
499
500
501    /**
502     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
503     * <p>
504     * Completed tasks are passed to another queue.
505     * <p>
506     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
507     * persisted, etc).
508     *
509     * @since 5.6
510     */
511    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
512
513        protected final String queueId;
514
515        /**
516         * List of running Work instances, in order to be able to interrupt them if requested.
517         */
518        // @GuardedBy("itself")
519        protected final List<Work> running;
520
521        // metrics, in cluster mode these counters must be aggregated, no logic should rely on them
522        // Number of work scheduled by this instance
523        protected final Counter scheduledCount;
524
525        // Number of work currently running on this instance
526        protected final Counter runningCount;
527
528        // Number of work completed by this instance
529        protected final Counter completedCount;
530
531        protected final Timer workTimer;
532
533        protected WorkThreadPoolExecutor(String queueId, int corePoolSize, int maximumPoolSize, long keepAliveTime,
534                                         TimeUnit unit, ThreadFactory threadFactory) {
535            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queuing.initWorkQueue(queueId), threadFactory);
536            this.queueId = queueId;
537            running = new LinkedList<Work>();
538            // init metrics
539            scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count"));
540            runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running"));
541            completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed"));
542            workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total"));
543        }
544
545        public int getScheduledOrRunningSize() {
546            int ret = 0;
547            for (String queueId : getWorkQueueIds()) {
548                ret += getQueueSize(queueId, null);
549            }
550            return ret;
551        }
552
553        @Override
554        public void execute(Runnable r) {
555            throw new UnsupportedOperationException("use other api");
556        }
557
558        /**
559         * Executes the given task sometime in the future.
560         *
561         * @param work the work to execute
562         * @see #execute(Runnable)
563         */
564        public void execute(Work work) {
565            scheduledCount.inc();
566            submit(work);
567        }
568
569        /**
570         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
571         * directly
572         *
573         * @param work
574         * @throws RuntimeException
575         */
576        protected void submit(Work work) throws RuntimeException {
577            boolean added = queuing.workSchedule(queueId, work);
578            if (!added) {
579                queuing.removeScheduled(queueId, work.getId());
580                throw new RuntimeException("queue should have blocked");
581            }
582            // DO NOT super.execute(new WorkHolder(work));
583        }
584
585        @Override
586        protected void beforeExecute(Thread t, Runnable r) {
587            Work work = WorkHolder.getWork(r);
588            work.setWorkInstanceState(State.RUNNING);
589            queuing.workRunning(queueId, work);
590            synchronized (running) {
591                running.add(work);
592            }
593            // metrics
594            runningCount.inc();
595        }
596
597        @Override
598        protected void afterExecute(Runnable r, Throwable t) {
599            Work work = WorkHolder.getWork(r);
600            synchronized (running) {
601                running.remove(work);
602            }
603            State state;
604            if (t == null) {
605                if (work.isWorkInstanceSuspended()) {
606                    state = State.SCHEDULED;
607                } else {
608                    state = State.COMPLETED;
609                }
610            } else {
611                state = State.FAILED;
612            }
613            work.setWorkInstanceState(state);
614            queuing.workCompleted(queueId, work);
615            // metrics
616            runningCount.dec();
617            completedCount.inc();
618            workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
619            completionSynchronizer.signalCompletedWork();
620        }
621
622        // called during shutdown
623        // with tasks from the queue if new tasks are submitted
624        // or with tasks drained from the queue
625        protected void removedFromQueue(Runnable r) {
626            Work work = WorkHolder.getWork(r);
627            work.setWorkInstanceState(State.CANCELED);
628            completionSynchronizer.signalCompletedWork();
629        }
630
631        /**
632         * Initiates a shutdown of this executor and asks for work instances to suspend themselves. The scheduled work
633         * instances are drained and suspended.
634         */
635        public void shutdownAndSuspend() {
636            // rejected tasks will be discarded
637            setRejectedExecutionHandler(CancelingPolicy.INSTANCE);
638            // shutdown the executor
639            // if a new task is scheduled it will be rejected -> discarded
640            shutdown();
641            // request all scheduled work instances to suspend (cancel)
642            int n = queuing.setSuspending(queueId);
643            // request all running work instances to suspend (stop)
644            synchronized (running) {
645                for (Work work : running) {
646                    work.setWorkInstanceSuspending();
647                }
648            }
649        }
650
651        /**
652         * Blocks until all work instances have completed after a shutdown and suspend request.
653         *
654         * @param timeout the time to wait
655         * @param unit the timeout unit
656         * @return true if all work stopped or was saved, false if some remaining after timeout
657         */
658        public boolean awaitTerminationOrSave(long timeout, TimeUnit unit) throws InterruptedException {
659            boolean terminated = super.awaitTermination(timeout, unit);
660            if (!terminated) {
661                // drain queue from remaining scheduled work
662                List<Runnable> drained = new ArrayList<>();
663                getQueue().drainTo(drained);
664                for (Runnable r : drained) {
665                    removedFromQueue(r);
666                }
667            }
668            // some work still remaining after timeout
669            return terminated;
670        }
671
672        public Work removeScheduled(String workId) {
673            Work w = queuing.removeScheduled(queueId, workId);
674            return w;
675        }
676
677    }
678
679    @Override
680    public void schedule(Work work) {
681        schedule(work, Scheduling.ENQUEUE, false);
682    }
683
684    @Override
685    public void schedule(Work work, boolean afterCommit) {
686        schedule(work, Scheduling.ENQUEUE, afterCommit);
687    }
688
689    @Override
690    public void schedule(Work work, Scheduling scheduling) {
691        schedule(work, scheduling, false);
692    }
693
694    @Override
695    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
696        String workId = work.getId();
697        String queueId = getCategoryQueueId(work.getCategory());
698        if (!isQueuingEnabled(queueId)) {
699            return;
700        }
701        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
702            return;
703        }
704        work.setWorkInstanceState(State.SCHEDULED);
705        WorkSchedulePath.newInstance(work);
706        if (log.isTraceEnabled()) {
707            log.trace("Scheduling work: " + work + " using queue: " + queueId, work.getSchedulePath().getStack());
708        } else if (log.isDebugEnabled()) {
709            log.debug("Scheduling work: " + work + " using queue: " + queueId);
710        }
711        switch (scheduling) {
712            case ENQUEUE:
713                break;
714            case CANCEL_SCHEDULED:
715                Work w = getExecutor(queueId).removeScheduled(workId);
716                if (w != null) {
717                    w.setWorkInstanceState(State.CANCELED);
718                    if (log.isDebugEnabled()) {
719                        log.debug("Canceling existing scheduled work before scheduling");
720                    }
721                }
722                break;
723            case IF_NOT_SCHEDULED:
724            case IF_NOT_RUNNING:
725            case IF_NOT_RUNNING_OR_SCHEDULED:
726                // TODO disabled for now because hasWorkInState uses isScheduled
727                // which is buggy
728                boolean disabled = Boolean.TRUE.booleanValue();
729                if (!disabled && hasWorkInState(workId, scheduling.state)) {
730                    // mark passed work as canceled
731                    work.setWorkInstanceState(State.CANCELED);
732                    if (log.isDebugEnabled()) {
733                        log.debug("Canceling schedule because found: " + scheduling);
734                    }
735                    return;
736
737                }
738                break;
739
740        }
741        getExecutor(queueId).execute(work);
742    }
743
744    /**
745     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
746     *
747     * @since 5.8
748     */
749    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
750        TransactionManager transactionManager;
751        try {
752            transactionManager = TransactionHelper.lookupTransactionManager();
753        } catch (NamingException e) {
754            transactionManager = null;
755        }
756        if (transactionManager == null) {
757            if (log.isDebugEnabled()) {
758                log.debug("Not scheduling work after commit because of missing transaction manager: " + work);
759            }
760            return false;
761        }
762        try {
763            Transaction transaction = transactionManager.getTransaction();
764            if (transaction == null) {
765                if (log.isDebugEnabled()) {
766                    log.debug("Not scheduling work after commit because of missing transaction: " + work);
767                }
768                return false;
769            }
770            int status = transaction.getStatus();
771            if (status == Status.STATUS_ACTIVE) {
772                if (log.isDebugEnabled()) {
773                    log.debug("Scheduling work after commit: " + work);
774                }
775                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
776                return true;
777            } else {
778                if (log.isDebugEnabled()) {
779                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
780                            + work);
781                }
782                return false;
783            }
784        } catch (SystemException | RollbackException e) {
785            log.error("Cannot schedule after commit", e);
786            return false;
787        }
788    }
789
790    @Override
791    @Deprecated
792    public Work find(Work work, State state, boolean useEquals, int[] pos) {
793        if (pos != null) {
794            pos[0] = 0; // compat
795        }
796        String workId = work.getId();
797        return queuing.find(workId, state);
798    }
799
800    @Override
801    public Work find(String workId, State state) {
802        return queuing.find(workId, state);
803    }
804
805    @Override
806    public String findResult(String workId) {
807        Work work = find(workId, State.COMPLETED);
808        return work != null ? work.getWorkInstanceResult() : null;
809    }
810
811    /**
812     * @param state SCHEDULED, RUNNING or null for both
813     */
814    protected boolean hasWorkInState(String workId, State state) {
815        return queuing.isWorkInState(workId, state);
816    }
817
818    @Override
819    public State getWorkState(String workId) {
820        return queuing.getWorkState(workId);
821    }
822
823    @Override
824    public List<Work> listWork(String queueId, State state) {
825        // don't return scheduled after commit
826        return queuing.listWork(queueId, state);
827    }
828
829    @Override
830    public List<String> listWorkIds(String queueId, State state) {
831        return queuing.listWorkIds(queueId, state);
832    }
833
834    @Override
835    public int getQueueSize(String queueId, State state) {
836        if (state == null) {
837            return getScheduledOrRunningSize(queueId);
838        }
839        if (state == State.SCHEDULED) {
840            return getScheduledSize(queueId);
841        } else if (state == State.RUNNING) {
842            return getRunningSize(queueId);
843        } else if (state == State.COMPLETED) {
844            return getCompletedSize(queueId);
845        } else {
846            throw new IllegalArgumentException(String.valueOf(state));
847        }
848    }
849
850    @Override
851    @Deprecated
852    public int getNonCompletedWorkSize(String queueId) {
853        return getScheduledOrRunningSize(queueId);
854    }
855
856    protected int getScheduledSize(String queueId) {
857        return queuing.count(queueId, State.SCHEDULED);
858    }
859
860    protected int getRunningSize(String queueId) {
861        return queuing.count(queueId, State.RUNNING);
862    }
863
864    protected int getScheduledOrRunningSize(String queueId) {
865        return getScheduledSize(queueId) + getRunningSize(queueId);
866    }
867
868    protected int getCompletedSize(String queueId) {
869        return queuing.count(queueId, State.COMPLETED);
870    }
871
872    @Override
873    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
874        return awaitCompletion(null, duration, unit);
875    }
876
877    @Override
878    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
879        SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId));
880        long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit);
881        long deadline = getTimestampAfter(durationInMs);
882        int pause = (int) Math.min(duration, 500L);
883        log.debug("awaitForCompletion " + durationInMs + " ms");
884        do {
885            if (noScheduledOrRunningWork(queueId)) {
886                log.debug("Completed");
887                completionSynchronizer.signalCompletedWork();
888                SequenceTracer.stop("done");
889                return true;
890            }
891            completionSynchronizer.waitForCompletedWork(pause);
892        } while (System.currentTimeMillis() < deadline);
893        log.info("awaitCompletion timeout after " + durationInMs + " ms");
894        SequenceTracer.destroy("timeout after " + durationInMs + " ms");
895        return false;
896    }
897
898    protected long getTimestampAfter(long durationInMs) {
899        long ret = System.currentTimeMillis() + durationInMs;
900        if (ret < 0) {
901            ret = Long.MAX_VALUE;
902        }
903        return ret;
904    }
905
906    protected boolean noScheduledOrRunningWork(String queueId) {
907        if (queueId != null) {
908            boolean ret = getQueueSize(queueId, null) == 0;
909            if (ret == false) {
910                if (log.isTraceEnabled()) {
911                    log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) +
912                            ", running: " + getQueueSize(queueId, State.RUNNING));
913                }
914            }
915            return ret;
916        }
917        for (String id : getWorkQueueIds()) {
918            if (getQueueSize(id, null) > 0) {
919                if (log.isTraceEnabled()) {
920                    log.trace(id + " not empty, sched: " + getQueueSize(id, State.SCHEDULED) +
921                            ", running: " + getQueueSize(id, State.RUNNING));
922                }
923                return false;
924            }
925        }
926        return true;
927    }
928
929    @Override
930    public synchronized void clearCompletedWork(String queueId) {
931        queuing.clearCompletedWork(queueId, 0);
932    }
933
934    @Override
935    public synchronized void clearCompletedWork(long completionTime) {
936        for (String queueId : queuing.getCompletedQueueIds()) {
937            queuing.clearCompletedWork(queueId, completionTime);
938        }
939    }
940
941    @Override
942    public synchronized void cleanup() {
943        log.debug("Clearing old completed work");
944        for (String queueId : queuing.getCompletedQueueIds()) {
945            WorkQueueDescriptor workQueueDescriptor = workQueueDescriptors.get(queueId);
946            if (workQueueDescriptor == null) {
947                // unknown queue
948                continue;
949            }
950            long delay = workQueueDescriptor.getClearCompletedAfterSeconds() * 1000L;
951            if (delay > 0) {
952                long completionTime = System.currentTimeMillis() - delay;
953                queuing.clearCompletedWork(queueId, completionTime);
954            }
955        }
956    }
957
958    @Override
959    public boolean isStarted() {
960        return started && !shutdownInProgress;
961    }
962
963}