001/*
002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.RejectedExecutionException;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036
037import javax.naming.NamingException;
038import javax.transaction.RollbackException;
039import javax.transaction.Status;
040import javax.transaction.Synchronization;
041import javax.transaction.SystemException;
042import javax.transaction.Transaction;
043import javax.transaction.TransactionManager;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.nuxeo.common.logging.SequenceTracer;
048import org.nuxeo.ecm.core.event.EventServiceComponent;
049import org.nuxeo.ecm.core.work.WorkQueuing.Listener;
050import org.nuxeo.ecm.core.work.api.Work;
051import org.nuxeo.ecm.core.work.api.Work.State;
052import org.nuxeo.ecm.core.work.api.WorkManager;
053import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
054import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
055import org.nuxeo.ecm.core.work.api.WorkQueuingDescriptor;
056import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
057import org.nuxeo.runtime.RuntimeServiceEvent;
058import org.nuxeo.runtime.RuntimeServiceListener;
059import org.nuxeo.runtime.api.Framework;
060import org.nuxeo.runtime.metrics.MetricsService;
061import org.nuxeo.runtime.model.ComponentContext;
062import org.nuxeo.runtime.model.ComponentInstance;
063import org.nuxeo.runtime.model.DefaultComponent;
064import org.nuxeo.runtime.transaction.TransactionHelper;
065
066import com.codahale.metrics.Counter;
067import com.codahale.metrics.MetricRegistry;
068import com.codahale.metrics.SharedMetricRegistries;
069import com.codahale.metrics.Timer;
070
071/**
072 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
073 * implementation.
074 *
075 * @since 5.6
076 */
077public class WorkManagerImpl extends DefaultComponent implements WorkManager {
078
079    public static final String NAME = "org.nuxeo.ecm.core.work.service";
080
081    private static final Log log = LogFactory.getLog(WorkManagerImpl.class);
082
083    protected static final String QUEUES_EP = "queues";
084
085    protected static final String IMPL_EP = "implementation";
086
087    public static final String DEFAULT_QUEUE_ID = "default";
088
089    public static final String DEFAULT_CATEGORY = "default";
090
091    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
092
093    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
094
095    // @GuardedBy("itself")
096    protected final WorkQueueRegistry workQueueConfig = new WorkQueueRegistry();
097
098    protected final WorkQueuingRegistry workQueuingConfig = new WorkQueuingRegistry();
099
100    // used synchronized
101    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
102
103    protected WorkQueuing queuing;
104
105    /**
106     * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from another
107     * node in cluster mode.
108     */
109    protected class WorkCompletionSynchronizer {
110        final protected ReentrantLock lock = new ReentrantLock();
111
112        final protected Condition condition = lock.newCondition();
113
114        protected boolean waitForCompletedWork(long timeMs) throws InterruptedException {
115            lock.lock();
116            try {
117                return condition.await(timeMs, TimeUnit.MILLISECONDS);
118            } finally {
119                lock.unlock();
120            }
121        }
122
123        protected void signalCompletedWork() {
124            lock.lock();
125            try {
126                condition.signalAll();
127            } finally {
128                lock.unlock();
129            }
130        }
131
132    }
133
134    protected WorkCompletionSynchronizer completionSynchronizer;
135
136    @Override
137    public void activate(ComponentContext context) {
138        Framework.addListener(new ShutdownListener());
139    }
140
141    @Override
142    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
143        if (QUEUES_EP.equals(extensionPoint)) {
144            registerWorkQueueDescriptor((WorkQueueDescriptor) contribution);
145        } else if (IMPL_EP.equals(extensionPoint)) {
146            registerWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
147        } else {
148            throw new RuntimeException("Unknown extension point: " + extensionPoint);
149        }
150    }
151
152    @Override
153    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
154        if (QUEUES_EP.equals(extensionPoint)) {
155            unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution);
156        } else if (IMPL_EP.equals(extensionPoint)) {
157            unregisterWorkQueuingDescriptor((WorkQueuingDescriptor) contribution);
158        } else {
159            throw new RuntimeException("Unknown extension point: " + extensionPoint);
160        }
161    }
162
163    void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
164        String queueId = workQueueDescriptor.id;
165        if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) {
166            Boolean processing = workQueueDescriptor.processing;
167            if (processing == null) {
168                log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES
169                        + " with no processing/queuing");
170                return;
171            }
172            String what = processing == null ? "" : (" processing=" + processing);
173            what += queuing == null ? "" : (" queuing=" + queuing);
174            log.info("Setting on all work queues:" + what);
175            // activate/deactivate processing/queuing on all queues
176            List<String> queueIds = new ArrayList<>(workQueueConfig.getQueueIds()); // copy
177            for (String id : queueIds) {
178                // add an updated contribution redefining processing/queuing
179                WorkQueueDescriptor wqd = new WorkQueueDescriptor();
180                wqd.id = id;
181                wqd.processing = processing;
182                registerWorkQueueDescriptor(wqd);
183            }
184            return;
185        }
186        workQueueConfig.addContribution(workQueueDescriptor);
187        WorkQueueDescriptor wqd = workQueueConfig.get(queueId);
188        log.info("Registered work queue " + queueId + " " + wqd.toString());
189    }
190
191    void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
192        String id = workQueueDescriptor.id;
193        if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) {
194            return;
195        }
196        workQueueConfig.removeContribution(workQueueDescriptor);
197        log.info("Unregistered work queue " + id);
198    }
199
200    void activateQueue(WorkQueueDescriptor config) {
201        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
202            return;
203        }
204        NuxeoBlockingQueue queue = queuing.getQueue(config.id);
205        if (queue == null) {
206            queue = queuing.init(config);
207        }
208        WorkThreadPoolExecutor executor = executors.get(config.id);
209        if (executor == null) {
210            ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + config.id + "-");
211            int maxPoolSize = config.getMaxThreads();
212            executor = new WorkThreadPoolExecutor(maxPoolSize, maxPoolSize,
213                    0, TimeUnit.SECONDS,
214                    queue, threadFactory);
215            // prestart all core threads so that direct additions to the queue
216            // (from another Nuxeo instance) can be seen
217            executor.prestartAllCoreThreads();
218            executors.put(config.id, executor);
219        }
220        queuing.setActive(config.id, config.isProcessingEnabled());
221        log.info("Activated work queue " + config.id + " " + config.toEffectiveString());
222    }
223
224    void deactivateQueue(WorkQueueDescriptor config) {
225        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
226            return;
227        }
228        WorkThreadPoolExecutor executor = executors.get(config.id);
229        try {
230            executor.shutdownAndSuspend();
231        } catch (InterruptedException cause) {
232            Thread.currentThread().interrupt();
233            throw new RuntimeException("Interrupted while deactivating queue " + config.id, cause);
234        }
235        log.info("Deactivated work queue " + config.id);
236    }
237
238    void registerWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
239        workQueuingConfig.addContribution(descr);
240    }
241
242    void unregisterWorkQueuingDescriptor(WorkQueuingDescriptor descr) {
243        workQueuingConfig.removeContribution(descr);
244    }
245
246    protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) {
247        try {
248            return klass.getDeclaredConstructor(Listener.class).newInstance(Listener.lookupListener());
249        } catch (ReflectiveOperationException | SecurityException e) {
250            throw new RuntimeException(e);
251        }
252    }
253
254    @Override
255    public boolean isQueuingEnabled(String queueId) {
256        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
257        return wqd == null ? false : wqd.isQueuingEnabled();
258    }
259
260    @Override
261    public void enableProcessing(boolean value) {
262        for (String queueId : workQueueConfig.getQueueIds()) {
263            queuing.getQueue(queueId).setActive(value);
264        }
265    }
266
267    @Override
268    public void enableProcessing(String queueId, boolean value) throws InterruptedException {
269        WorkQueueDescriptor config = workQueueConfig.get(queueId);
270        if (config == null) {
271            throw new IllegalArgumentException("no such queue " + queueId);
272        }
273        if (!value) {
274            deactivateQueue(config);
275        } else {
276            activateQueue(config);
277        }
278    }
279
280    @Override
281    public boolean isProcessingEnabled() {
282        for (String queueId : workQueueConfig.getQueueIds()) {
283            if (queuing.getQueue(queueId).active) {
284                return true;
285            }
286        }
287        return false;
288    }
289
290    @Override
291    public boolean isProcessingEnabled(String queueId) {
292        if (queueId == null) {
293            return isProcessingEnabled();
294        }
295        return queuing.getQueue(queueId).active;
296    }
297
298    // ----- WorkManager -----
299
300    @Override
301    public List<String> getWorkQueueIds() {
302        synchronized (workQueueConfig) {
303            return workQueueConfig.getQueueIds();
304        }
305    }
306
307    @Override
308    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
309        synchronized (workQueueConfig) {
310            return workQueueConfig.get(queueId);
311        }
312    }
313
314    @Override
315    public String getCategoryQueueId(String category) {
316        if (category == null) {
317            category = DEFAULT_CATEGORY;
318        }
319        String queueId = workQueueConfig.getQueueId(category);
320        if (queueId == null) {
321            queueId = DEFAULT_QUEUE_ID;
322        }
323        return queueId;
324    }
325
326    @Override
327    public int getApplicationStartedOrder() {
328        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
329    }
330
331    @Override
332    public void applicationStarted(ComponentContext context) {
333        init();
334    }
335
336    protected volatile boolean started = false;
337
338    protected volatile boolean shutdownInProgress = false;
339
340    @Override
341    public void init() {
342        if (started) {
343            return;
344        }
345        synchronized (this) {
346            if (started) {
347                return;
348            }
349            queuing = newWorkQueuing(workQueuingConfig.klass);
350            completionSynchronizer = new WorkCompletionSynchronizer();
351            started = true;
352            workQueueConfig.index();
353            for (String id : workQueueConfig.getQueueIds()) {
354                activateQueue(workQueueConfig.get(id));
355            }
356        }
357    }
358
359    protected WorkThreadPoolExecutor getExecutor(String queueId) {
360        if (!started) {
361            if (Framework.isTestModeSet() && !Framework.getRuntime()
362                    .isShuttingDown()) {
363                LogFactory.getLog(WorkManagerImpl.class)
364                        .warn("Lazy starting of work manager in test mode");
365                init();
366            } else {
367                throw new IllegalStateException("Work manager not started, could not access to executors");
368            }
369        }
370        WorkQueueDescriptor workQueueDescriptor;
371        synchronized (workQueueConfig) {
372            workQueueDescriptor = workQueueConfig.get(queueId);
373        }
374        if (workQueueDescriptor == null) {
375            throw new IllegalArgumentException("No such work queue: " + queueId);
376        }
377
378        return executors.get(queueId);
379    }
380
381    @Override
382    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
383        WorkThreadPoolExecutor executor = getExecutor(queueId);
384        return shutdownExecutors(Collections.singleton(executor), timeout, unit);
385    }
386
387    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
388            throws InterruptedException {
389        // mark executors as shutting down
390        for (WorkThreadPoolExecutor executor : list) {
391            executor.shutdownAndSuspend();
392        }
393        timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
394        // wait until threads termination
395        for (WorkThreadPoolExecutor executor : list) {
396            long t0 = System.currentTimeMillis();
397            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
398                return false;
399            }
400            timeout -= unit.convert(System.currentTimeMillis() - t0, TimeUnit.MILLISECONDS);
401        }
402        return true;
403    }
404
405    protected long remainingMillis(long t0, long delay) {
406        long d = System.currentTimeMillis() - t0;
407        if (d > delay) {
408            return 0;
409        }
410        return delay - d;
411    }
412
413    protected synchronized void removeExecutor(String queueId) {
414        executors.remove(queueId);
415    }
416
417    @Override
418    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
419        shutdownInProgress = true;
420        try {
421            return shutdownExecutors(new ArrayList<>(executors.values()), timeout, unit);
422        } finally {
423            shutdownInProgress = false;
424            started = false;
425        }
426    }
427
428    protected class ShutdownListener implements RuntimeServiceListener {
429        @Override
430        public void handleEvent(RuntimeServiceEvent event) {
431            if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) {
432                return;
433            }
434            Framework.removeListener(this);
435            try {
436                if (!shutdown(10, TimeUnit.SECONDS)) {
437                    log.error("Some processors are still active");
438                }
439            } catch (InterruptedException cause) {
440                Thread.currentThread().interrupt();
441                log.error("Interrupted during works manager shutdown, continuing runtime shutdown", cause);
442            }
443        }
444    }
445
446    /**
447     * A work instance and how to schedule it, for schedule-after-commit.
448     *
449     * @since 5.8
450     */
451    public class WorkScheduling implements Synchronization {
452        public final Work work;
453
454        public final Scheduling scheduling;
455
456        public WorkScheduling(Work work, Scheduling scheduling) {
457            this.work = work;
458            this.scheduling = scheduling;
459        }
460
461        @Override
462        public void beforeCompletion() {
463        }
464
465        @Override
466        public void afterCompletion(int status) {
467            if (status == Status.STATUS_COMMITTED) {
468                schedule(work, scheduling, false);
469            } else if (status == Status.STATUS_ROLLEDBACK) {
470                work.setWorkInstanceState(State.UNKNOWN);
471            } else {
472                throw new IllegalArgumentException("Unsupported transaction status " + status);
473            }
474        }
475    }
476
477    /**
478     * Creates non-daemon threads at normal priority.
479     */
480    private static class NamedThreadFactory implements ThreadFactory {
481
482        private final AtomicInteger threadNumber = new AtomicInteger();
483
484        private final ThreadGroup group;
485
486        private final String prefix;
487
488        public NamedThreadFactory(String prefix) {
489            SecurityManager sm = System.getSecurityManager();
490            group = sm == null ? Thread.currentThread()
491                    .getThreadGroup() : sm.getThreadGroup();
492            this.prefix = prefix;
493        }
494
495        @Override
496        public Thread newThread(Runnable r) {
497            String name = prefix + threadNumber.incrementAndGet();
498            Thread thread = new Thread(group, r, name);
499            // do not set daemon
500            thread.setPriority(Thread.NORM_PRIORITY);
501            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
502
503                @Override
504                public void uncaughtException(Thread t, Throwable e) {
505                    LogFactory.getLog(WorkManagerImpl.class)
506                            .error("Uncaught error on thread " + t.getName(), e);
507                }
508            });
509            return thread;
510        }
511    }
512
513    /**
514     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
515     * <p>
516     * Completed tasks are passed to another queue.
517     * <p>
518     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
519     * persisted, etc).
520     *
521     * @since 5.6
522     */
523    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
524
525        protected final String queueId;
526
527        /**
528         * List of running Work instances, in order to be able to interrupt them if requested.
529         */
530        // @GuardedBy("itself")
531        protected final ConcurrentLinkedQueue<Work> running;
532
533        // metrics, in cluster mode these counters must be aggregated, no logic should rely on them
534        // Number of work scheduled by this instance
535        protected final Counter scheduledCount;
536
537        // Number of work currently running on this instance
538        protected final Counter runningCount;
539
540        // Number of work completed by this instance
541        protected final Counter completedCount;
542
543        protected final Timer workTimer;
544
545
546        protected WorkThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
547                TimeUnit unit, NuxeoBlockingQueue queue, ThreadFactory threadFactory) {
548            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, threadFactory);
549            queueId = queue.queueId;
550            running = new ConcurrentLinkedQueue<Work>();
551            // init metrics
552            scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count"));
553            runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running"));
554            completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed"));
555            workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total"));
556        }
557
558        public int getScheduledOrRunningSize() {
559            int ret = 0;
560            for (String queueId : getWorkQueueIds()) {
561                ret += getQueueSize(queueId, null);
562            }
563            return ret;
564        }
565
566        @Override
567        public void execute(Runnable r) {
568            throw new UnsupportedOperationException("use other api");
569        }
570
571        /**
572         * Executes the given task sometime in the future.
573         *
574         * @param work the work to execute
575         * @see #execute(Runnable)
576         */
577        public void execute(Work work) {
578            scheduledCount.inc();
579            submit(work);
580        }
581
582        /**
583         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
584         * directly
585         *
586         * @param work
587         * @throws RuntimeException
588         */
589        protected void submit(Work work) throws RuntimeException {
590            queuing.workSchedule(queueId, work);
591        }
592
593        @Override
594        protected void beforeExecute(Thread t, Runnable r) {
595            Work work = WorkHolder.getWork(r);
596            if (isShutdown()) {
597                work.setWorkInstanceState(State.SCHEDULED);
598                queuing.workReschedule(queueId, work);
599                throw new RejectedExecutionException(queueId + " was shutdown, rescheduled " + work);
600            }
601            work.setWorkInstanceState(State.RUNNING);
602            queuing.workRunning(queueId, work);
603            running.add(work);
604            runningCount.inc();
605        }
606
607        @Override
608        protected void afterExecute(Runnable r, Throwable t) {
609            Work work = WorkHolder.getWork(r);
610            try {
611                if (work.isSuspending()) {
612                    log.trace(work + " is suspending, giving up");
613                    return;
614                }
615                if (isShutdown()) {
616                    log.trace("rescheduling " + work.getId(), t);
617                    work.setWorkInstanceState(State.SCHEDULED);
618                    queuing.workReschedule(queueId, work);
619                    return;
620                }
621                work.setWorkInstanceState(State.UNKNOWN);
622                queuing.workCompleted(queueId, work);
623            } finally {
624                running.remove(work);
625                runningCount.dec();
626                completedCount.inc();
627                workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
628                completionSynchronizer.signalCompletedWork();
629            }
630        }
631
632
633        /**
634         * Initiates a shutdown of this executor and asks for work instances to suspend themselves.
635         * @throws InterruptedException
636         */
637        public void shutdownAndSuspend() throws InterruptedException {
638            try {
639                // don't consume the queue anymore
640                queuing.setActive(queueId, false);
641                // suspend all running work
642                for (Work work : running) {
643                    work.setWorkInstanceSuspending();
644                    log.trace("suspending and rescheduling " + work.getId());
645                    work.setWorkInstanceState(State.SCHEDULED);
646                    queuing.workReschedule(queueId, work);
647                }
648                shutdownNow();
649            } finally {
650                executors.remove(queueId);
651            }
652        }
653
654        public void removeScheduled(String workId) {
655            queuing.removeScheduled(queueId, workId);
656        }
657
658    }
659
660    @Override
661    public void schedule(Work work) {
662        schedule(work, Scheduling.ENQUEUE, false);
663    }
664
665    @Override
666    public void schedule(Work work, boolean afterCommit) {
667        schedule(work, Scheduling.ENQUEUE, afterCommit);
668    }
669
670    @Override
671    public void schedule(Work work, Scheduling scheduling) {
672        schedule(work, scheduling, false);
673    }
674
675    @Override
676    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
677        String workId = work.getId();
678        String queueId = getCategoryQueueId(work.getCategory());
679        if (!isQueuingEnabled(queueId)) {
680            return;
681        }
682        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
683            return;
684        }
685        work.setWorkInstanceState(State.SCHEDULED);
686        WorkSchedulePath.newInstance(work);
687        switch (scheduling) {
688        case ENQUEUE:
689            break;
690        case CANCEL_SCHEDULED:
691            getExecutor(queueId).removeScheduled(workId);
692            break;
693        case IF_NOT_SCHEDULED:
694        case IF_NOT_RUNNING:
695        case IF_NOT_RUNNING_OR_SCHEDULED:
696            // TODO disabled for now because hasWorkInState uses isScheduled
697            // which is buggy
698            boolean disabled = Boolean.TRUE.booleanValue();
699            if (!disabled && hasWorkInState(workId, scheduling.state)) {
700                if (log.isDebugEnabled()) {
701                    log.debug("Canceling schedule because found: " + scheduling);
702                }
703                return;
704
705            }
706            break;
707
708        }
709        queuing.workSchedule(queueId, work);
710    }
711
712    /**
713     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
714     *
715     * @since 5.8
716     */
717    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
718        TransactionManager transactionManager;
719        try {
720            transactionManager = TransactionHelper.lookupTransactionManager();
721        } catch (NamingException e) {
722            transactionManager = null;
723        }
724        if (transactionManager == null) {
725            if (log.isDebugEnabled()) {
726                log.debug("Not scheduling work after commit because of missing transaction manager: " + work);
727            }
728            return false;
729        }
730        try {
731            Transaction transaction = transactionManager.getTransaction();
732            if (transaction == null) {
733                if (log.isDebugEnabled()) {
734                    log.debug("Not scheduling work after commit because of missing transaction: " + work);
735                }
736                return false;
737            }
738            int status = transaction.getStatus();
739            if (status == Status.STATUS_ACTIVE) {
740                if (log.isDebugEnabled()) {
741                    log.debug("Scheduling work after commit: " + work);
742                }
743                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
744                return true;
745            } else if (status == Status.STATUS_COMMITTED) {
746                // called in afterCompletion, we can schedule immediately
747                if (log.isDebugEnabled()) {
748                    log.debug("Scheduling work immediately: " + work);
749                }
750                return false;
751            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
752                if (log.isDebugEnabled()) {
753                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work);
754                }
755                return true;
756            } else {
757                if (log.isDebugEnabled()) {
758                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
759                            + work);
760                }
761                return false;
762            }
763        } catch (SystemException | RollbackException e) {
764            log.error("Cannot schedule after commit", e);
765            return false;
766        }
767    }
768
769    @Override
770    @Deprecated
771    public Work find(Work work, State state, boolean useEquals, int[] pos) {
772        if (pos != null) {
773            pos[0] = 0; // compat
774        }
775        String workId = work.getId();
776        return queuing.find(workId, state);
777    }
778
779    @Override
780    public Work find(String workId, State state) {
781        return queuing.find(workId, state);
782    }
783
784    /**
785     * @param state SCHEDULED, RUNNING or null for both
786     */
787    protected boolean hasWorkInState(String workId, State state) {
788        return queuing.isWorkInState(workId, state);
789    }
790
791    @Override
792    public State getWorkState(String workId) {
793        return queuing.getWorkState(workId);
794    }
795
796    @Override
797    public List<Work> listWork(String queueId, State state) {
798        // don't return scheduled after commit
799        return queuing.listWork(queueId, state);
800    }
801
802    @Override
803    public List<String> listWorkIds(String queueId, State state) {
804        return queuing.listWorkIds(queueId, state);
805    }
806
807    @Override
808    public WorkQueueMetrics getMetrics(String queueId) {
809        return queuing.metrics(queueId);
810    }
811
812    @Override
813    public int getQueueSize(String queueId, State state) {
814        WorkQueueMetrics metrics = getMetrics(queueId);
815        if (state == null) {
816            return metrics.scheduled.intValue() + metrics.running.intValue();
817        }
818        if (state == State.SCHEDULED) {
819            return metrics.scheduled.intValue();
820        } else if (state == State.RUNNING) {
821            return metrics.running.intValue();
822        } else {
823            throw new IllegalArgumentException(String.valueOf(state));
824        }
825    }
826
827    @Override
828    @Deprecated
829    public int getNonCompletedWorkSize(String queueId) {
830        return getQueueSize(queueId, null);
831    }
832
833    @Override
834    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
835        return awaitCompletion(null, duration, unit);
836    }
837
838    @Override
839    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
840        if (!isStarted()) {
841            return true;
842        }
843        SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId));
844        long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit);
845        long deadline = getTimestampAfter(durationInMs);
846        int pause = (int) Math.min(duration, 500L);
847        log.debug("awaitForCompletion " + durationInMs + " ms");
848        do {
849            if (noScheduledOrRunningWork(queueId)) {
850                completionSynchronizer.signalCompletedWork();
851                SequenceTracer.stop("done");
852                return true;
853            }
854            completionSynchronizer.waitForCompletedWork(pause);
855        } while (System.currentTimeMillis() < deadline);
856        log.info("awaitCompletion timeout after " + durationInMs + " ms");
857        SequenceTracer.destroy("timeout after " + durationInMs + " ms");
858        return false;
859    }
860
861    protected long getTimestampAfter(long durationInMs) {
862        long ret = System.currentTimeMillis() + durationInMs;
863        if (ret < 0) {
864            ret = Long.MAX_VALUE;
865        }
866        return ret;
867    }
868
869    protected boolean noScheduledOrRunningWork(String queueId) {
870        if (queueId == null) {
871            for (String id : getWorkQueueIds()) {
872                if (!noScheduledOrRunningWork(id)) {
873                    return false;
874                }
875            }
876            return true;
877        }
878        if (!isProcessingEnabled(queueId)) {
879            return getExecutor(queueId).runningCount.getCount() == 0L;
880        }
881        boolean ret = getQueueSize(queueId, null) == 0;
882        if (ret == false) {
883            if (log.isTraceEnabled()) {
884                log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) +
885                        ", running: " + getQueueSize(queueId, State.RUNNING));
886            }
887            return false;
888        }
889        if (log.isTraceEnabled()) {
890            log.trace(queueId + " is completed");
891        }
892        return true;
893    }
894
895    @Override
896    public boolean isStarted() {
897        return started && !shutdownInProgress;
898    }
899
900}