001/*
002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.locks.Condition;
034import java.util.concurrent.locks.ReentrantLock;
035
036import javax.naming.NamingException;
037import javax.transaction.RollbackException;
038import javax.transaction.Status;
039import javax.transaction.Synchronization;
040import javax.transaction.SystemException;
041import javax.transaction.Transaction;
042import javax.transaction.TransactionManager;
043
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.nuxeo.common.logging.SequenceTracer;
047import org.nuxeo.ecm.core.event.EventServiceComponent;
048import org.nuxeo.ecm.core.work.WorkQueuing.Listener;
049import org.nuxeo.ecm.core.work.api.Work;
050import org.nuxeo.ecm.core.work.api.Work.State;
051import org.nuxeo.ecm.core.work.api.WorkManager;
052import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
053import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
054import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor;
055import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
056import org.nuxeo.runtime.RuntimeServiceEvent;
057import org.nuxeo.runtime.RuntimeServiceListener;
058import org.nuxeo.runtime.api.Framework;
059import org.nuxeo.runtime.metrics.MetricsService;
060import org.nuxeo.runtime.model.ComponentContext;
061import org.nuxeo.runtime.model.ComponentInstance;
062import org.nuxeo.runtime.model.DefaultComponent;
063import org.nuxeo.runtime.transaction.TransactionHelper;
064
065import com.codahale.metrics.Counter;
066import com.codahale.metrics.MetricRegistry;
067import com.codahale.metrics.SharedMetricRegistries;
068import com.codahale.metrics.Timer;
069
070/**
071 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
072 * implementation.
073 *
074 * @since 5.6
075 */
076public class WorkManagerImpl extends DefaultComponent implements WorkManager {
077
078    public static final String NAME = "org.nuxeo.ecm.core.work.service";
079
080    private static final Log log = LogFactory.getLog(WorkManagerImpl.class);
081
082    protected static final String QUEUES_EP = "queues";
083
084    protected static final String IMPL_EP = "implementation";
085
086    public static final String DEFAULT_QUEUE_ID = "default";
087
088    public static final String DEFAULT_CATEGORY = "default";
089
090    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
091
092    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
093
094    // @GuardedBy("itself")
095    protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry();
096
097    protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry();
098
099    // used synchronized
100    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
101
102    protected WorkQueuing queuing;
103
104    /**
105     * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another
106     * node in cluster mode.
107     */
108    protected class WorkCompletionSynchronizer {
109        final protected ReentrantLock lock = new ReentrantLock();
110
111        final protected Condition condition = lock.newCondition();
112
113        protected boolean waitForCompletedWork(long timeMs) throws InterruptedException {
114            lock.lock();
115            try {
116                return condition.await(timeMs, TimeUnit.MILLISECONDS);
117            } finally {
118                lock.unlock();
119            }
120        }
121
122        protected void signalCompletedWork() {
123            lock.lock();
124            try {
125                condition.signalAll();
126            } finally {
127                lock.unlock();
128            }
129        }
130
131    }
132
133    protected WorkCompletionSynchronizer completionSynchronizer;
134
135    @Override
136    public void activate(ComponentContext context) {
137        Framework.addListener(new ShutdownListener());
138    }
139
140    @Override
141    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
142        if (QUEUES_EP.equals(extensionPoint)) {
143            registerWorkQueueDescriptor((WorkQueueDescriptor) contribution);
144        } else if (IMPL_EP.equals(extensionPoint)) {
145            registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
146        } else {
147            throw new RuntimeException("Unknown extension point: " + extensionPoint);
148        }
149    }
150
151    @Override
152    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
153        if (QUEUES_EP.equals(extensionPoint)) {
154            unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution);
155        } else if (IMPL_EP.equals(extensionPoint)) {
156            unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
157        } else {
158            throw new RuntimeException("Unknown extension point: " + extensionPoint);
159        }
160    }
161
162    void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
163        String queueId = workQueueDescriptor.id;
164        if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) {
165            Boolean processing = workQueueDescriptor.processing;
166            if (processing == null) {
167                log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES
168                        + " with no processing/queuing");
169                return;
170            }
171            String what = processing == null ? "" : (" processing=" + processing);
172            what += queuing == null ? "" : (" queuing=" + queuing);
173            log.info("Setting on all work queues:" + what);
174            // activate/deactivate processing/queuing on all queues
175            List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy
176            for (String id : queueIds) {
177                // add an updated contribution redefining processing/queuing
178                WorkQueueDescriptor wqd = new WorkQueueDescriptor();
179                wqd.id = id;
180                wqd.processing = processing;
181                registerWorkQueueDescriptor(wqd);
182            }
183            return;
184        }
185        workQueueConfig.addContribution(workQueueDescriptor);
186        WorkQueueDescriptor wqd = workQueueConfig.get(queueId);
187        log.info("Registered work queue " + queueId + " " + wqd.toString());
188    }
189
190    void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
191        String id = workQueueDescriptor.id;
192        if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) {
193            return;
194        }
195        workQueueConfig.removeContribution(workQueueDescriptor);
196        log.info("Unregistered work queue " + id);
197    }
198
199    void activateQueue(WorkQueueDescriptor config) {
200        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
201            return;
202        }
203        NuxeoBlockingQueue queue = queuing.getQueue(config.id);
204        if (queue == null) {
205            queue = queuing.init(config);
206        }
207        WorkThreadPoolExecutor executor = executors.get(config.id);
208        if (executor == null) {
209            ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-");
210            int maxPoolSize = config.getMaxThreads();
211            executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize,
212                    0, TimeUnit.SECONDS,
213                    queue, threadFactory);
214            // prestart all core threads so that direct additions to the queue
215            // (from another Nuxeo instance) can be seen
216            executor.prestartAllCoreThreads();
217            executors.put(config.id, executor);
218        }
219        queuing.setActive(config.id, config.isProcessingEnabled());
220        log.info("Activated work queue " + config.id + " " + config.toEffectiveString());
221    }
222
223    void deactivateQueue(WorkQueueDescriptor config) {
224        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
225            return;
226        }
227        queuing.setActive(config.id, false);
228        WorkThreadPoolExecutor executor = executors.remove(config.id);
229        executor.shutdownAndSuspend();
230        log.info("Deactivated work queue " + config.id);
231    }
232
233    void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
234        workQueuingConfig.addContribution(descr);
235    }
236
237    void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
238        workQueuingConfig.removeContribution(descr);
239    }
240
241    protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) {
242        try {
243            return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener());
244        } catch (ReflectiveOperationException | SecurityException e) {
245            throw new RuntimeException(e);
246        }
247    }
248
249    @Override
250    public boolean isQueuingEnabled(String queueId) {
251        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
252        return wqd == null ? false : wqd.isQueuingEnabled();
253    }
254
255    @Override
256    public void enableProcessing(boolean value) {
257        for (String queueId : workQueueConfig.getQueueIds()) {
258            queuing.getQueue(queueId).setActive(value);
259        }
260    }
261
262    @Override
263    public void enableProcessing(String queueId, boolean value) throws InterruptedException {
264        WorkQueueDescriptor config = workQueueConfig.get(queueId);
265        if (config == null) {
266            throw new IllegalArgumentException("no such queue " + queueId);
267        }
268        if (!value) {
269            deactivateQueue(config);
270        } else {
271            activateQueue(config);
272        }
273    }
274
275    @Override
276    public boolean isProcessingEnabled() {
277        for (String queueId : workQueueConfig.getQueueIds()) {
278            if (queuing.getQueue(queueId).active) {
279                return true;
280            }
281        }
282        return false;
283    }
284
285    @Override
286    public boolean isProcessingEnabled(String queueId) {
287        return queuing.getQueue(queueId).active;
288    }
289
290    // ----- WorkManager -----
291
292    @Override
293    public List<String> getWorkQueueIds() {
294        synchronized (workQueueConfig) {
295            return workQueueConfig.getQueueIds();
296        }
297    }
298
299    @Override
300    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
301        synchronized (workQueueConfig) {
302            return workQueueConfig.get(queueId);
303        }
304    }
305
306    @Override
307    public String getCategoryQueueId(String category) {
308        if (category == null) {
309            category = DEFAULT_CATEGORY;
310        }
311        String queueId = workQueueConfig.getQueueId(category);
312        if (queueId == null) {
313            queueId = DEFAULT_QUEUE_ID;
314        }
315        return queueId;
316    }
317
318    @Override
319    public int getApplicationStartedOrder() {
320        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
321    }
322
323    @Override
324    public void applicationStarted(ComponentContext context) {
325        init();
326    }
327
328    protected volatile boolean started = false;
329
330    protected volatile boolean shutdownInProgress = false;
331
332    @Override
333    public void init() {
334        if (started) {
335            return;
336        }
337        synchronized (this) {
338            if (started) {
339                return;
340            }
341            queuing = newWorkQueuing(workQueuingConfig.klass);
342            completionSynchronizer = new WorkCompletionSynchronizer();
343            started = true;
344            workQueueConfig.index();
345            for (String id : workQueueConfig.getQueueIds()) {
346                activateQueue(workQueueConfig.get(id));
347            }
348        }
349    }
350
351    protected WorkThreadPoolExecutor getExecutor(String queueId) {
352        if (!started) {
353            if (Framework.isTestModeSet() && !Framework.getRuntime()
354                    .isShuttingDown()) {
355                LogFactory.getLog(WorkManagerImpl.class)
356                        .warn("Lazy starting of work manager in test mode");
357                init();
358            } else {
359                throw new IllegalStateException("Work manager not started, could not access to executors");
360            }
361        }
362        WorkQueueDescriptor workQueueDescriptor;
363        synchronized (workQueueConfig) {
364            workQueueDescriptor = workQueueConfig.get(queueId);
365        }
366        if (workQueueDescriptor == null) {
367            throw new IllegalArgumentException("No such work queue: " + queueId);
368        }
369
370        return executors.get(queueId);
371    }
372
373    @Override
374    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
375        WorkThreadPoolExecutor executor = getExecutor(queueId);
376        boolean terminated = shutdownExecutors(Collections.singleton(executor), timeout, unit);
377        removeExecutor(queueId); // start afresh
378        return terminated;
379    }
380
381    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
382            throws InterruptedException {
383        // mark executors as shutting down
384        for (WorkThreadPoolExecutor executor : list) {
385            executor.shutdownAndSuspend();
386        }
387        timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
388        // wait until threads termination
389        for (WorkThreadPoolExecutor executor : list) {
390            long t0 = System.currentTimeMillis();
391            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
392                return false;
393            }
394            timeout -= unit.convert(System.currentTimeMillis() - t0, TimeUnit.MILLISECONDS);
395        }
396        return true;
397    }
398
399    protected long remainingMillis(long t0, long delay) {
400        long d = System.currentTimeMillis() - t0;
401        if (d > delay) {
402            return 0;
403        }
404        return delay - d;
405    }
406
407    protected synchronized void removeExecutor(String queueId) {
408        executors.remove(queueId);
409    }
410
411    @Override
412    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
413        shutdownInProgress = true;
414        try {
415            return shutdownExecutors(executors.values(), timeout, unit);
416        } finally {
417            shutdownInProgress = false;
418            started = false;
419            completionSynchronizer = null;
420            executors.clear();
421            queuing = null;
422        }
423    }
424
425    protected class ShutdownListener implements RuntimeServiceListener {
426        @Override
427        public void handleEvent(RuntimeServiceEvent event) {
428            if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) {
429                return;
430            }
431            Framework.removeListener(this);
432            try {
433                if (!shutdown(10, TimeUnit.SECONDS)) {
434                    log.error("Some processors are still active");
435                }
436            } catch (InterruptedException cause) {
437                Thread.currentThread().interrupt();
438                log.error("Interrupted during works manager shutdown, continuing runtime shutdown", cause);
439            }
440        }
441    }
442
443    /**
444     * A work instance and how to schedule it, for schedule-after-commit.
445     *
446     * @since 5.8
447     */
448    public class WorkScheduling implements Synchronization {
449        public final Work work;
450
451        public final Scheduling scheduling;
452
453        public WorkScheduling(Work work, Scheduling scheduling) {
454            this.work = work;
455            this.scheduling = scheduling;
456        }
457
458        @Override
459        public void beforeCompletion() {
460        }
461
462        @Override
463        public void afterCompletion(int status) {
464            if (status == Status.STATUS_COMMITTED) {
465                schedule(work, scheduling, false);
466            } else if (status == Status.STATUS_ROLLEDBACK) {
467                work.setWorkInstanceState(State.UNKNOWN);
468            } else {
469                throw new IllegalArgumentException("Unsupported transaction status " + status);
470            }
471        }
472    }
473
474    /**
475     * Creates non-daemon threads at normal priority.
476     */
477    private static class NamedThreadFactory implements ThreadFactory {
478
479        private final AtomicInteger threadNumber = new AtomicInteger();
480
481        private final ThreadGroup group;
482
483        private final String prefix;
484
485        public NamedThreadFactory(String prefix) {
486            SecurityManager sm = System.getSecurityManager();
487            group = sm == null ? Thread.currentThread()
488                    .getThreadGroup() : sm.getThreadGroup();
489            this.prefix = prefix;
490        }
491
492        @Override
493        public Thread newThread(Runnable r) {
494            String name = prefix + threadNumber.incrementAndGet();
495            Thread thread = new Thread(group, r, name);
496            // do not set daemon
497            thread.setPriority(Thread.NORM_PRIORITY);
498            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
499
500                @Override
501                public void uncaughtException(Thread t, Throwable e) {
502                    LogFactory.getLog(WorkManagerImpl.class)
503                            .error("Uncaught error on thread " + t.getName(), e);
504                }
505            });
506            return thread;
507        }
508    }
509
510    /**
511     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
512     * <p>
513     * Completed tasks are passed to another queue.
514     * <p>
515     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
516     * persisted, etc).
517     *
518     * @since 5.6
519     */
520    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
521
522        protected final String queueId;
523
524        /**
525         * List of running Work instances, in order to be able to interrupt them if requested.
526         */
527        // @GuardedBy("itself")
528        protected final ConcurrentLinkedQueue<Work> running;
529
530        // metrics, in cluster mode these counters must be aggregated, no logic should rely on them
531        // Number of work scheduled by this instance
532        protected final Counter scheduledCount;
533
534        // Number of work currently running on this instance
535        protected final Counter runningCount;
536
537        // Number of work completed by this instance
538        protected final Counter completedCount;
539
540        protected final Timer workTimer;
541
542
543        protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
544                TimeUnit unit, NuxeoBlockingQueue queue, ThreadFactory threadFactory) {
545            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory);
546            queueId = queue.queueId;
547            running = new ConcurrentLinkedQueue<Work>();
548            // init metrics
549            scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count"));
550            runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running"));
551            completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed"));
552            workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total"));
553        }
554
555        public int getScheduledOrRunningSize() {
556            int ret = 0;
557            for (String queueId : getWorkQueueIds()) {
558                ret += getQueueSize(queueId, null);
559            }
560            return ret;
561        }
562
563        @Override
564        public void execute(Runnable r) {
565            throw new UnsupportedOperationException("use other api");
566        }
567
568        /**
569         * Executes the given task sometime in the future.
570         *
571         * @param work the work to execute
572         * @see #execute(Runnable)
573         */
574        public void execute(Work work) {
575            scheduledCount.inc();
576            submit(work);
577        }
578
579        /**
580         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
581         * directly
582         *
583         * @param work
584         * @throws RuntimeException
585         */
586        protected void submit(Work work) throws RuntimeException {
587            queuing.workSchedule(queueId, work);
588        }
589
590        @Override
591        protected void beforeExecute(Thread t, Runnable r) {
592            Work work = WorkHolder.getWork(r);
593            if (started == false || shutdownInProgress == true) {
594                work.setWorkInstanceState(State.SCHEDULED);
595                queuing.workSchedule(queueId, work);
596                return;
597            }
598            work.setWorkInstanceState(State.RUNNING);
599            queuing.workRunning(queueId, work);
600            running.add(work);
601            runningCount.inc();
602        }
603
604        @Override
605        protected void afterExecute(Runnable r, Throwable t) {
606            Work work = WorkHolder.getWork(r);
607            try {
608                if (work.isSuspending()) {
609                    return;
610                }
611                work.setWorkInstanceState(State.UNKNOWN);
612                queuing.workCompleted(queueId, work);
613            } finally {
614                running.remove(work);
615                runningCount.dec();
616                completedCount.inc();
617                workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
618                completionSynchronizer.signalCompletedWork();
619            }
620        }
621
622        // called during shutdown
623        // with tasks from the queue if new tasks are submitted
624        // or with tasks drained from the queue
625        protected void removedFromQueue(Runnable r) {
626            Work work = WorkHolder.getWork(r);
627            work.setWorkInstanceState(State.UNKNOWN);
628            completionSynchronizer.signalCompletedWork();
629        }
630
631        /**
632         * Initiates a shutdown of this executor and asks for work instances to suspend themselves.
633         */
634        public void shutdownAndSuspend() {
635            // don't consume the queue anymore
636            ((NuxeoBlockingQueue) getQueue()).setActive(false);
637            // shutdown the executor
638            shutdown();
639            // suspend and reschedule all running work
640            synchronized (running) {
641                for (Work work : running) {
642                    work.setWorkInstanceSuspending();
643                    work.setWorkInstanceState(State.SCHEDULED);
644                    queuing.workReschedule(queueId, work);
645                }
646            }
647            shutdownNow();
648        }
649
650        public void removeScheduled(String workId) {
651            queuing.removeScheduled(queueId, workId);
652        }
653
654    }
655
656    @Override
657    public void schedule(Work work) {
658        schedule(work, Scheduling.ENQUEUE, false);
659    }
660
661    @Override
662    public void schedule(Work work, boolean afterCommit) {
663        schedule(work, Scheduling.ENQUEUE, afterCommit);
664    }
665
666    @Override
667    public void schedule(Work work, Scheduling scheduling) {
668        schedule(work, scheduling, false);
669    }
670
671    @Override
672    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
673        String workId = work.getId();
674        String queueId = getCategoryQueueId(work.getCategory());
675        if (!isQueuingEnabled(queueId)) {
676            return;
677        }
678        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
679            return;
680        }
681        work.setWorkInstanceState(State.SCHEDULED);
682        WorkSchedulePath.newInstance(work);
683        switch (scheduling) {
684        case ENQUEUE:
685            break;
686        case CANCEL_SCHEDULED:
687            getExecutor(queueId).removeScheduled(workId);
688            break;
689        case IF_NOT_SCHEDULED:
690        case IF_NOT_RUNNING:
691        case IF_NOT_RUNNING_OR_SCHEDULED:
692            // TODO disabled for now because hasWorkInState uses isScheduled
693            // which is buggy
694            boolean disabled = Boolean.TRUE.booleanValue();
695            if (!disabled && hasWorkInState(workId, scheduling.state)) {
696                if (log.isDebugEnabled()) {
697                    log.debug("Canceling schedule because found: " + scheduling);
698                }
699                return;
700
701            }
702            break;
703
704        }
705        queuing.workSchedule(queueId, work);
706    }
707
708    /**
709     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
710     *
711     * @since 5.8
712     */
713    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
714        TransactionManager transactionManager;
715        try {
716            transactionManager = TransactionHelper.lookupTransactionManager();
717        } catch (NamingException e) {
718            transactionManager = null;
719        }
720        if (transactionManager == null) {
721            if (log.isDebugEnabled()) {
722                log.debug("Not scheduling work after commit because of missing transaction manager: " + work);
723            }
724            return false;
725        }
726        try {
727            Transaction transaction = transactionManager.getTransaction();
728            if (transaction == null) {
729                if (log.isDebugEnabled()) {
730                    log.debug("Not scheduling work after commit because of missing transaction: " + work);
731                }
732                return false;
733            }
734            int status = transaction.getStatus();
735            if (status == Status.STATUS_ACTIVE) {
736                if (log.isDebugEnabled()) {
737                    log.debug("Scheduling work after commit: " + work);
738                }
739                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
740                return true;
741            } else if (status == Status.STATUS_COMMITTED) {
742                // called in afterCompletion, we can schedule immediately
743                if (log.isDebugEnabled()) {
744                    log.debug("Scheduling work immediately: " + work);
745                }
746                return false;
747            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
748                if (log.isDebugEnabled()) {
749                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work);
750                }
751                return true;
752            } else {
753                if (log.isDebugEnabled()) {
754                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
755                            + work);
756                }
757                return false;
758            }
759        } catch (SystemException | RollbackException e) {
760            log.error("Cannot schedule after commit", e);
761            return false;
762        }
763    }
764
765    @Override
766    @Deprecated
767    public Work find(Work work, State state, boolean useEquals, int[] pos) {
768        if (pos != null) {
769            pos[0] = 0; // compat
770        }
771        String workId = work.getId();
772        return queuing.find(workId, state);
773    }
774
775    @Override
776    public Work find(String workId, State state) {
777        return queuing.find(workId, state);
778    }
779
780    /**
781     * @param state SCHEDULED, RUNNING or null for both
782     */
783    protected boolean hasWorkInState(String workId, State state) {
784        return queuing.isWorkInState(workId, state);
785    }
786
787    @Override
788    public State getWorkState(String workId) {
789        return queuing.getWorkState(workId);
790    }
791
792    @Override
793    public List<Work> listWork(String queueId, State state) {
794        // don't return scheduled after commit
795        return queuing.listWork(queueId, state);
796    }
797
798    @Override
799    public List<String> listWorkIds(String queueId, State state) {
800        return queuing.listWorkIds(queueId, state);
801    }
802
803    @Override
804    public WorkQueueMetrics getMetrics(String queueId) {
805        return queuing.metrics(queueId);
806    }
807
808    @Override
809    public int getQueueSize(String queueId, State state) {
810        WorkQueueMetrics metrics = getMetrics(queueId);
811        if (state == null) {
812            return metrics.scheduled.intValue() + metrics.running.intValue();
813        }
814        if (state == State.SCHEDULED) {
815            return metrics.scheduled.intValue();
816        } else if (state == State.RUNNING) {
817            return metrics.running.intValue();
818        } else {
819            throw new IllegalArgumentException(String.valueOf(state));
820        }
821    }
822
823    @Override
824    @Deprecated
825    public int getNonCompletedWorkSize(String queueId) {
826        return getQueueSize(queueId, null);
827    }
828
829    @Override
830    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
831        return awaitCompletion(null, duration, unit);
832    }
833
834    @Override
835    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
836        if (!isStarted()) {
837            return true;
838        }
839        SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId));
840        long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit);
841        long deadline = getTimestampAfter(durationInMs);
842        int pause = (int) Math.min(duration, 500L);
843        log.debug("awaitForCompletion " + durationInMs + " ms");
844        do {
845            if (noScheduledOrRunningWork(queueId)) {
846                completionSynchronizer.signalCompletedWork();
847                SequenceTracer.stop("done");
848                return true;
849            }
850            completionSynchronizer.waitForCompletedWork(pause);
851        } while (System.currentTimeMillis() < deadline);
852        log.info("awaitCompletion timeout after " + durationInMs + " ms");
853        SequenceTracer.destroy("timeout after " + durationInMs + " ms");
854        return false;
855    }
856
857    protected long getTimestampAfter(long durationInMs) {
858        long ret = System.currentTimeMillis() + durationInMs;
859        if (ret < 0) {
860            ret = Long.MAX_VALUE;
861        }
862        return ret;
863    }
864
865    protected boolean noScheduledOrRunningWork(String queueId) {
866        if (queueId == null) {
867            for (String id : getWorkQueueIds()) {
868                if (!noScheduledOrRunningWork(id)) {
869                    return false;
870                }
871            }
872            return true;
873        }
874        boolean ret = getQueueSize(queueId, null) == 0;
875        if (ret == false) {
876            if (log.isTraceEnabled()) {
877                log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) +
878                        ", running: " + getQueueSize(queueId, State.RUNNING));
879            }
880            return false;
881        }
882        if (log.isTraceEnabled()) {
883            log.trace(queueId + " is completed");
884        }
885        return true;
886    }
887
888    @Override
889    public boolean isStarted() {
890        return started && !shutdownInProgress;
891    }
892
893}