001/*
002 * (C) Copyright 2012-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.lang.reflect.Constructor;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.ConcurrentLinkedQueue;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036
037import javax.naming.NamingException;
038import javax.transaction.RollbackException;
039import javax.transaction.Status;
040import javax.transaction.Synchronization;
041import javax.transaction.SystemException;
042import javax.transaction.Transaction;
043import javax.transaction.TransactionManager;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.nuxeo.common.logging.SequenceTracer;
048import org.nuxeo.ecm.core.event.EventServiceComponent;
049import org.nuxeo.ecm.core.work.api.Work;
050import org.nuxeo.ecm.core.work.api.Work.State;
051import org.nuxeo.ecm.core.work.api.WorkManager;
052import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
053import org.nuxeo.ecm.core.work.api.WorkQueuingImplDescriptor;
054import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
055import org.nuxeo.runtime.RuntimeServiceEvent;
056import org.nuxeo.runtime.RuntimeServiceListener;
057import org.nuxeo.runtime.api.Framework;
058import org.nuxeo.runtime.metrics.MetricsService;
059import org.nuxeo.runtime.model.ComponentContext;
060import org.nuxeo.runtime.model.ComponentInstance;
061import org.nuxeo.runtime.model.DefaultComponent;
062import org.nuxeo.runtime.transaction.TransactionHelper;
063
064import com.codahale.metrics.Counter;
065import com.codahale.metrics.MetricRegistry;
066import com.codahale.metrics.SharedMetricRegistries;
067import com.codahale.metrics.Timer;
068
069/**
070 * The implementation of a {@link WorkManager}. This delegates the queuing implementation to a {@link WorkQueuing}
071 * implementation.
072 *
073 * @since 5.6
074 */
075public class WorkManagerImpl extends DefaultComponent implements WorkManager {
076
077    public static final String NAME = "org.nuxeo.ecm.core.work.service";
078
079    private static final Log log = LogFactory.getLog(WorkManagerImpl.class);
080
081    protected static final String QUEUES_EP = "queues";
082
083    protected static final String IMPL_EP = "implementation";
084
085    public static final String DEFAULT_QUEUE_ID = "default";
086
087    public static final String DEFAULT_CATEGORY = "default";
088
089    protected static final String THREAD_PREFIX = "Nuxeo-Work-";
090
091    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
092
093    // @GuardedBy("itself")
094    protected final WorkQueueDescriptorRegistry workQueueDescriptors = new WorkQueueDescriptorRegistry(this);
095
096    // used synchronized
097    protected final Map<String, WorkThreadPoolExecutor> executors = new HashMap<>();
098
099    protected WorkQueuing queuing = newWorkQueuing(MemoryWorkQueuing.class);
100
101    /**
102     * Simple synchronizer to wake up when an in-JVM work is completed. Does not wake up on work completion from
103     * another node in cluster mode.
104     */
105    protected class WorkCompletionSynchronizer {
106        final protected ReentrantLock lock = new ReentrantLock();
107        final protected Condition condition = lock.newCondition();
108
109        protected boolean waitForCompletedWork(long timeMs) throws InterruptedException {
110            lock.lock();
111            try {
112                return condition.await(timeMs, TimeUnit.MILLISECONDS);
113            } finally {
114                lock.unlock();
115            }
116        }
117
118        protected void signalCompletedWork() {
119            lock.lock();
120            try {
121                condition.signalAll();
122            } finally {
123                lock.unlock();
124            }
125        }
126
127    }
128
129    protected WorkCompletionSynchronizer completionSynchronizer;
130
131    @Override
132    public void activate(ComponentContext context) {
133        Framework.addListener(new ShutdownListener());
134    }
135
136    @Override
137    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
138        if (QUEUES_EP.equals(extensionPoint)) {
139            registerWorkQueueDescriptor((WorkQueueDescriptor) contribution);
140        } else if (IMPL_EP.equals(extensionPoint)) {
141            registerWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution);
142        } else {
143            throw new RuntimeException("Unknown extension point: " + extensionPoint);
144        }
145    }
146
147    @Override
148    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
149        if (QUEUES_EP.equals(extensionPoint)) {
150            unregisterWorkQueueDescriptor((WorkQueueDescriptor) contribution);
151        } else if (IMPL_EP.equals(extensionPoint)) {
152            unregisterWorkQueuingDescriptor((WorkQueuingImplDescriptor) contribution);
153        } else {
154            throw new RuntimeException("Unknown extension point: " + extensionPoint);
155        }
156    }
157
158    public void registerWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
159        String queueId = workQueueDescriptor.id;
160        if (WorkQueueDescriptor.ALL_QUEUES.equals(queueId)) {
161            Boolean processing = workQueueDescriptor.processing;
162            Boolean queuing = workQueueDescriptor.queuing;
163            if (processing == null && queuing == null) {
164                log.error("Ignoring work queue descriptor " + WorkQueueDescriptor.ALL_QUEUES
165                        + " with no processing/queuing");
166                return;
167            }
168            String what = processing == null ? "" : (" processing=" + processing);
169            what += queuing == null ? "" : (" queuing=" + queuing);
170            log.info("Setting on all work queues:" + what);
171            // activate/deactivate processing/queuing on all queues
172            List<String> queueIds = new ArrayList<>(workQueueDescriptors.getQueueIds()); // copy
173            for (String id : queueIds) {
174                // add an updated contribution redefining processing/queuing
175                WorkQueueDescriptor wqd = new WorkQueueDescriptor();
176                wqd.id = id;
177                wqd.processing = processing;
178                wqd.queuing = queuing;
179                registerWorkQueueDescriptor(wqd);
180            }
181            return;
182        }
183        workQueueDescriptors.addContribution(workQueueDescriptor);
184        WorkQueueDescriptor wqd = workQueueDescriptors.get(queueId);
185        log.info("Registered work queue " + queueId + " " + wqd.toString());
186    }
187
188    public void unregisterWorkQueueDescriptor(WorkQueueDescriptor workQueueDescriptor) {
189        String id = workQueueDescriptor.id;
190        if (WorkQueueDescriptor.ALL_QUEUES.equals(id)) {
191            return;
192        }
193        workQueueDescriptors.removeContribution(workQueueDescriptor);
194        log.info("Unregistered work queue " + id);
195    }
196
197    protected void activateQueue(WorkQueueDescriptor workQueueDescriptor) {
198        String id = workQueueDescriptor.id;
199        WorkThreadPoolExecutor executor = executors.get(id);
200        if (executor == null) {
201            ThreadFactory threadFactory = new NamedThreadFactory(THREAD_PREFIX + id + "-");
202            int maxPoolSize = workQueueDescriptor.getMaxThreads();
203            executor = new WorkThreadPoolExecutor(id, maxPoolSize, maxPoolSize, 0, TimeUnit.SECONDS, threadFactory);
204            // prestart all core threads so that direct additions to the queue
205            // (from another Nuxeo instance) can be seen
206            executor.prestartAllCoreThreads();
207            executors.put(id, executor);
208        }
209        NuxeoBlockingQueue queue = (NuxeoBlockingQueue) executor.getQueue();
210        // get merged contrib
211        // set active state
212        queue.setActive(workQueueDescriptor.isProcessingEnabled());
213        log.info("Activated work queue " + id + " " + workQueueDescriptor.toEffectiveString());
214    }
215
216    public void deactivateQueue(WorkQueueDescriptor workQueueDescriptor) {
217        if (WorkQueueDescriptor.ALL_QUEUES.equals(workQueueDescriptor.id)) {
218            return;
219        }
220        WorkThreadPoolExecutor executor = executors.get(workQueueDescriptor.id);
221        executor.shutdownAndSuspend();
222        log.info("Deactivated work queue " + workQueueDescriptor.id);
223    }
224
225    public void registerWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) {
226        WorkQueuing q = newWorkQueuing(descr.getWorkQueuingClass());
227        registerWorkQueuing(q);
228    }
229
230    public void registerWorkQueuing(WorkQueuing q) {
231        closeQueuing();
232        queuing = q;
233    }
234
235    public void unregisterWorkQueuingDescriptor(WorkQueuingImplDescriptor descr) {
236        unregisterWorkQueing();
237    }
238
239    public void unregisterWorkQueing() {
240        closeQueuing();
241        queuing = newWorkQueuing(MemoryWorkQueuing.class);
242    }
243
244    protected WorkQueuing newWorkQueuing(Class<? extends WorkQueuing> klass) {
245        WorkQueuing q;
246        try {
247            Constructor<? extends WorkQueuing> ctor = klass.getConstructor(WorkManagerImpl.class,
248                    WorkQueueDescriptorRegistry.class);
249            q = ctor.newInstance(this, workQueueDescriptors);
250        } catch (ReflectiveOperationException | SecurityException e) {
251            throw new RuntimeException(e);
252        }
253        return q;
254    }
255
256    protected void closeQueuing() {
257        try {
258            shutdown(5, TimeUnit.SECONDS);
259        } catch (InterruptedException e) {
260            Thread.currentThread().interrupt(); // restore interrupted status
261            throw new RuntimeException(e);
262        }
263    }
264
265    protected boolean isQueuingEnabled(String queueId) {
266        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
267        return wqd == null ? false : wqd.isQueuingEnabled();
268    }
269
270    protected boolean isProcessingEnabled(String queueId) {
271        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
272        return wqd == null ? false : wqd.isProcessingEnabled();
273    }
274
275    // ----- WorkManager -----
276
277    @Override
278    public List<String> getWorkQueueIds() {
279        synchronized (workQueueDescriptors) {
280            return workQueueDescriptors.getQueueIds();
281        }
282    }
283
284    @Override
285    public WorkQueueDescriptor getWorkQueueDescriptor(String queueId) {
286        synchronized (workQueueDescriptors) {
287            return workQueueDescriptors.get(queueId);
288        }
289    }
290
291    @Override
292    public String getCategoryQueueId(String category) {
293        if (category == null) {
294            category = DEFAULT_CATEGORY;
295        }
296        String queueId = workQueueDescriptors.getQueueId(category);
297        if (queueId == null) {
298            queueId = DEFAULT_QUEUE_ID;
299        }
300        return queueId;
301    }
302
303    @Override
304    public int getApplicationStartedOrder() {
305        return EventServiceComponent.APPLICATION_STARTED_ORDER - 1;
306    }
307
308    @Override
309    public void applicationStarted(ComponentContext context) {
310        init();
311    }
312
313    protected volatile boolean started = false;
314
315    protected volatile boolean shutdownInProgress = false;
316
317    @Override
318    public void init() {
319        if (started) {
320            return;
321        }
322        synchronized (this) {
323            if (started) {
324                return;
325            }
326            completionSynchronizer = new WorkCompletionSynchronizer();
327            started = true;
328            queuing.init();
329            for (String id : workQueueDescriptors.getQueueIds()) {
330                activateQueue(workQueueDescriptors.get(id));
331            }
332        }
333    }
334
335    protected WorkThreadPoolExecutor getExecutor(String queueId) {
336        if (!started) {
337            if (Framework.isTestModeSet() && !Framework.getRuntime().isShuttingDown()) {
338                LogFactory.getLog(WorkManagerImpl.class).warn("Lazy starting of work manager in test mode");
339                init();
340            } else {
341                throw new IllegalStateException("Work manager not started, could not access to executors");
342            }
343        }
344        WorkQueueDescriptor workQueueDescriptor;
345        synchronized (workQueueDescriptors) {
346            workQueueDescriptor = workQueueDescriptors.get(queueId);
347        }
348        if (workQueueDescriptor == null) {
349            throw new IllegalArgumentException("No such work queue: " + queueId);
350        }
351
352        return executors.get(queueId);
353    }
354
355    @Override
356    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
357        WorkThreadPoolExecutor executor = getExecutor(queueId);
358        boolean terminated = shutdownExecutors(Collections.singleton(executor), timeout, unit);
359        removeExecutor(queueId); // start afresh
360        return terminated;
361    }
362
363    protected boolean shutdownExecutors(Collection<WorkThreadPoolExecutor> list, long timeout, TimeUnit unit)
364            throws InterruptedException {
365        // mark executors as shutting down
366        for (WorkThreadPoolExecutor executor : list) {
367            executor.shutdownAndSuspend();
368        }
369        // don't wait anymore, running will always abort
370        return true;
371    }
372
373    protected long remainingMillis(long t0, long delay) {
374        long d = System.currentTimeMillis() - t0;
375        if (d > delay) {
376            return 0;
377        }
378        return delay - d;
379    }
380
381    protected synchronized void removeExecutor(String queueId) {
382        executors.remove(queueId);
383    }
384
385    @Override
386    public boolean shutdown(long timeout, TimeUnit unit) throws InterruptedException {
387        shutdownInProgress = true;
388        try {
389            return shutdownExecutors(executors.values(), timeout, unit);
390        } finally {
391            shutdownInProgress = false;
392            started = false;
393            completionSynchronizer = null;
394            executors.clear();
395        }
396    }
397
398    protected class ShutdownListener implements RuntimeServiceListener {
399        @Override
400        public void handleEvent(RuntimeServiceEvent event) {
401            if (RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP != event.id) {
402                return;
403            }
404            Framework.removeListener(this);
405            closeQueuing();
406        }
407    }
408
409    /**
410     * A work instance and how to schedule it, for schedule-after-commit.
411     *
412     * @since 5.8
413     */
414    public class WorkScheduling implements Synchronization {
415        public final Work work;
416
417        public final Scheduling scheduling;
418
419        public WorkScheduling(Work work, Scheduling scheduling) {
420            this.work = work;
421            this.scheduling = scheduling;
422        }
423
424        @Override
425        public void beforeCompletion() {
426        }
427
428        @Override
429        public void afterCompletion(int status) {
430            if (status == Status.STATUS_COMMITTED) {
431                schedule(work, scheduling, false);
432            } else if (status == Status.STATUS_ROLLEDBACK) {
433                work.setWorkInstanceState(State.CANCELED);
434            } else {
435                throw new IllegalArgumentException("Unsupported transaction status " + status);
436            }
437        }
438    }
439
440    /**
441     * Creates non-daemon threads at normal priority.
442     */
443    private static class NamedThreadFactory implements ThreadFactory {
444
445        private final AtomicInteger threadNumber = new AtomicInteger();
446
447        private final ThreadGroup group;
448
449        private final String prefix;
450
451        public NamedThreadFactory(String prefix) {
452            SecurityManager sm = System.getSecurityManager();
453            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
454            this.prefix = prefix;
455        }
456
457        @Override
458        public Thread newThread(Runnable r) {
459            String name = prefix + threadNumber.incrementAndGet();
460            Thread thread = new Thread(group, r, name);
461            // do not set daemon
462            thread.setPriority(Thread.NORM_PRIORITY);
463            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
464
465                @Override
466                public void uncaughtException(Thread t, Throwable e) {
467                    LogFactory.getLog(WorkManagerImpl.class).error("Uncaught error on thread " + t.getName(), e);
468                }
469            });
470            return thread;
471        }
472    }
473
474    /**
475     * A {@link ThreadPoolExecutor} that keeps available the list of running tasks.
476     * <p>
477     * Completed tasks are passed to another queue.
478     * <p>
479     * The scheduled queue and completed list are passed as arguments and can have different implementations (in-memory,
480     * persisted, etc).
481     *
482     * @since 5.6
483     */
484    protected class WorkThreadPoolExecutor extends ThreadPoolExecutor {
485
486        protected final String queueId;
487
488        /**
489         * List of running Work instances, in order to be able to interrupt them if requested.
490         */
491        // @GuardedBy("itself")
492        protected final ConcurrentLinkedQueue<Work> running;
493
494        // metrics, in cluster mode these counters must be aggregated, no logic should rely on them
495        // Number of work scheduled by this instance
496        protected final Counter scheduledCount;
497
498        // Number of work currently running on this instance
499        protected final Counter runningCount;
500
501        // Number of work completed by this instance
502        protected final Counter completedCount;
503
504        protected final Timer workTimer;
505
506        protected WorkThreadPoolExecutor(String queueId, int corePoolSize, int maximumPoolSize, long keepAliveTime,
507                                         TimeUnit unit, ThreadFactory threadFactory) {
508            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queuing.initWorkQueue(queueId), threadFactory);
509            this.queueId = queueId;
510            running = new ConcurrentLinkedQueue<Work>();
511            // init metrics
512            scheduledCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "scheduled", "count"));
513            runningCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "running"));
514            completedCount = registry.counter(MetricRegistry.name("nuxeo", "works", queueId, "completed"));
515            workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", queueId, "total"));
516        }
517
518        public int getScheduledOrRunningSize() {
519            int ret = 0;
520            for (String queueId : getWorkQueueIds()) {
521                ret += getQueueSize(queueId, null);
522            }
523            return ret;
524        }
525
526        @Override
527        public void execute(Runnable r) {
528            throw new UnsupportedOperationException("use other api");
529        }
530
531        /**
532         * Executes the given task sometime in the future.
533         *
534         * @param work the work to execute
535         * @see #execute(Runnable)
536         */
537        public void execute(Work work) {
538            scheduledCount.inc();
539            submit(work);
540        }
541
542        /**
543         * go through the queue instead of using super.execute which may skip the queue and hand off to a thread
544         * directly
545         *
546         * @param work
547         * @throws RuntimeException
548         */
549        protected void submit(Work work) throws RuntimeException {
550            boolean added = queuing.workSchedule(queueId, work);
551            if (!added) {
552                queuing.removeScheduled(queueId, work.getId());
553                throw new RuntimeException("queue should have blocked");
554            }
555            // DO NOT super.execute(new WorkHolder(work));
556        }
557
558        @Override
559        protected void beforeExecute(Thread t, Runnable r) {
560            Work work = WorkHolder.getWork(r);
561            if (started == false || shutdownInProgress == true) {
562                work.setWorkInstanceState(State.SCHEDULED);
563                queuing.workSchedule(queueId, work);
564                return;
565            }
566            work.setWorkInstanceState(State.RUNNING);
567            queuing.workRunning(queueId, work);
568            running.add(work);
569            runningCount.inc();
570        }
571
572        @Override
573        protected void afterExecute(Runnable r, Throwable t) {
574            Work work = WorkHolder.getWork(r);
575            try {
576                if (work.isSuspending()) {
577                    return;
578                }
579                work.setWorkInstanceState(t == null ? State.COMPLETED : State.FAILED);
580                queuing.workCompleted(queueId, work);
581            } finally {
582                // metrics
583                running.remove(work);
584                runningCount.dec();
585                completedCount.inc();
586                workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
587                completionSynchronizer.signalCompletedWork();
588            }
589        }
590
591        // called during shutdown
592        // with tasks from the queue if new tasks are submitted
593        // or with tasks drained from the queue
594        protected void removedFromQueue(Runnable r) {
595            Work work = WorkHolder.getWork(r);
596            work.setWorkInstanceState(State.CANCELED);
597            completionSynchronizer.signalCompletedWork();
598        }
599
600        /**
601         * Initiates a shutdown of this executor and asks for work instances to suspend themselves.
602         */
603        public void shutdownAndSuspend() {
604            // don't consume the queue anymore
605            ((NuxeoBlockingQueue) getQueue()).setActive(false);
606            // shutdown the executor
607            shutdown();
608            // suspend and reschedule all running work
609            synchronized (running) {
610                for (Work work : running) {
611                    log.debug("re-scheduling " + work);
612                    work.setWorkInstanceState(State.SCHEDULED);
613                    queuing.workSchedule(queueId, work);
614                    work.setWorkInstanceSuspending();
615                }
616            }
617            shutdownNow();
618        }
619
620        /**
621         * Blocks until all work instances have completed after a shutdown and suspend request.
622         *
623         * @param timeout the time to wait
624         * @param unit the timeout unit
625         * @return true if all work stopped or was saved, false if some remaining after timeout
626         */
627        public boolean awaitTerminationOrSave(long timeout, TimeUnit unit) throws InterruptedException {
628            boolean terminated = super.awaitTermination(timeout, unit);
629            if (!terminated) {
630                // drain queue from remaining scheduled work
631                List<Runnable> drained = new ArrayList<>();
632                getQueue().drainTo(drained);
633                for (Runnable r : drained) {
634                    removedFromQueue(r);
635                }
636            }
637            // some work still remaining after timeout
638            return terminated;
639        }
640
641        public Work removeScheduled(String workId) {
642            Work w = queuing.removeScheduled(queueId, workId);
643            return w;
644        }
645
646    }
647
648    @Override
649    public void schedule(Work work) {
650        schedule(work, Scheduling.ENQUEUE, false);
651    }
652
653    @Override
654    public void schedule(Work work, boolean afterCommit) {
655        schedule(work, Scheduling.ENQUEUE, afterCommit);
656    }
657
658    @Override
659    public void schedule(Work work, Scheduling scheduling) {
660        schedule(work, scheduling, false);
661    }
662
663    @Override
664    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
665        String workId = work.getId();
666        String queueId = getCategoryQueueId(work.getCategory());
667        if (!isQueuingEnabled(queueId)) {
668            return;
669        }
670        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
671            return;
672        }
673        work.setWorkInstanceState(State.SCHEDULED);
674        WorkSchedulePath.newInstance(work);
675        if (log.isTraceEnabled()) {
676            log.trace("Scheduling work: " + work + " using queue: " + queueId, work.getSchedulePath().getStack());
677        } else if (log.isDebugEnabled()) {
678            log.debug("Scheduling work: " + work + " using queue: " + queueId);
679        }
680        switch (scheduling) {
681            case ENQUEUE:
682                break;
683            case CANCEL_SCHEDULED:
684                Work w = getExecutor(queueId).removeScheduled(workId);
685                if (w != null) {
686                    w.setWorkInstanceState(State.CANCELED);
687                    if (log.isDebugEnabled()) {
688                        log.debug("Canceling existing scheduled work before scheduling");
689                    }
690                }
691                break;
692            case IF_NOT_SCHEDULED:
693            case IF_NOT_RUNNING:
694            case IF_NOT_RUNNING_OR_SCHEDULED:
695                // TODO disabled for now because hasWorkInState uses isScheduled
696                // which is buggy
697                boolean disabled = Boolean.TRUE.booleanValue();
698                if (!disabled && hasWorkInState(workId, scheduling.state)) {
699                    // mark passed work as canceled
700                    work.setWorkInstanceState(State.CANCELED);
701                    if (log.isDebugEnabled()) {
702                        log.debug("Canceling schedule because found: " + scheduling);
703                    }
704                    return;
705
706                }
707                break;
708
709        }
710        getExecutor(queueId).execute(work);
711    }
712
713    /**
714     * Schedule after commit. Returns {@code false} if impossible (no transaction or transaction manager).
715     *
716     * @since 5.8
717     */
718    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
719        TransactionManager transactionManager;
720        try {
721            transactionManager = TransactionHelper.lookupTransactionManager();
722        } catch (NamingException e) {
723            transactionManager = null;
724        }
725        if (transactionManager == null) {
726            if (log.isDebugEnabled()) {
727                log.debug("Not scheduling work after commit because of missing transaction manager: " + work);
728            }
729            return false;
730        }
731        try {
732            Transaction transaction = transactionManager.getTransaction();
733            if (transaction == null) {
734                if (log.isDebugEnabled()) {
735                    log.debug("Not scheduling work after commit because of missing transaction: " + work);
736                }
737                return false;
738            }
739            int status = transaction.getStatus();
740            if (status == Status.STATUS_ACTIVE) {
741                if (log.isDebugEnabled()) {
742                    log.debug("Scheduling work after commit: " + work);
743                }
744                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
745                return true;
746            } else if (status == Status.STATUS_COMMITTED) {
747                // called in afterCompletion, we can schedule immediately
748                if (log.isDebugEnabled()) {
749                    log.debug("Scheduling work immediately: " + work);
750                }
751                return false;
752            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
753                if (log.isDebugEnabled()) {
754                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work);
755                }
756                work.setWorkInstanceState(State.CANCELED);
757                return true;
758            } else {
759                if (log.isDebugEnabled()) {
760                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
761                            + work);
762                }
763                return false;
764            }
765        } catch (SystemException | RollbackException e) {
766            log.error("Cannot schedule after commit", e);
767            return false;
768        }
769    }
770
771    @Override
772    @Deprecated
773    public Work find(Work work, State state, boolean useEquals, int[] pos) {
774        if (pos != null) {
775            pos[0] = 0; // compat
776        }
777        String workId = work.getId();
778        return queuing.find(workId, state);
779    }
780
781    @Override
782    public Work find(String workId, State state) {
783        return queuing.find(workId, state);
784    }
785
786    @Override
787    public String findResult(String workId) {
788        Work work = find(workId, State.COMPLETED);
789        return work != null ? work.getWorkInstanceResult() : null;
790    }
791
792    /**
793     * @param state SCHEDULED, RUNNING or null for both
794     */
795    protected boolean hasWorkInState(String workId, State state) {
796        return queuing.isWorkInState(workId, state);
797    }
798
799    @Override
800    public State getWorkState(String workId) {
801        return queuing.getWorkState(workId);
802    }
803
804    @Override
805    public List<Work> listWork(String queueId, State state) {
806        // don't return scheduled after commit
807        return queuing.listWork(queueId, state);
808    }
809
810    @Override
811    public List<String> listWorkIds(String queueId, State state) {
812        return queuing.listWorkIds(queueId, state);
813    }
814
815    @Override
816    public int getQueueSize(String queueId, State state) {
817        if (state == null) {
818            return getScheduledOrRunningSize(queueId);
819        }
820        if (state == State.SCHEDULED) {
821            return getScheduledSize(queueId);
822        } else if (state == State.RUNNING) {
823            return getRunningSize(queueId);
824        } else if (state == State.COMPLETED) {
825            return getCompletedSize(queueId);
826        } else {
827            throw new IllegalArgumentException(String.valueOf(state));
828        }
829    }
830
831    @Override
832    @Deprecated
833    public int getNonCompletedWorkSize(String queueId) {
834        return getScheduledOrRunningSize(queueId);
835    }
836
837    protected int getScheduledSize(String queueId) {
838        return queuing.count(queueId, State.SCHEDULED);
839    }
840
841    protected int getRunningSize(String queueId) {
842        return queuing.count(queueId, State.RUNNING);
843    }
844
845    protected int getScheduledOrRunningSize(String queueId) {
846        return getScheduledSize(queueId) + getRunningSize(queueId);
847    }
848
849    protected int getCompletedSize(String queueId) {
850        return queuing.count(queueId, State.COMPLETED);
851    }
852
853    @Override
854    public boolean awaitCompletion(long duration, TimeUnit unit) throws InterruptedException {
855        return awaitCompletion(null, duration, unit);
856    }
857
858    @Override
859    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
860        if (!isStarted()) {
861            return true;
862        }
863        SequenceTracer.start("awaitCompletion on " + ((queueId == null) ? "all queues" : queueId));
864        long durationInMs = TimeUnit.MILLISECONDS.convert(duration, unit);
865        long deadline = getTimestampAfter(durationInMs);
866        int pause = (int) Math.min(duration, 500L);
867        log.debug("awaitForCompletion " + durationInMs + " ms");
868        do {
869            if (noScheduledOrRunningWork(queueId)) {
870                log.debug("Completed");
871                completionSynchronizer.signalCompletedWork();
872                SequenceTracer.stop("done");
873                return true;
874            }
875            completionSynchronizer.waitForCompletedWork(pause);
876        } while (System.currentTimeMillis() < deadline);
877        log.info("awaitCompletion timeout after " + durationInMs + " ms");
878        SequenceTracer.destroy("timeout after " + durationInMs + " ms");
879        return false;
880    }
881
882    protected long getTimestampAfter(long durationInMs) {
883        long ret = System.currentTimeMillis() + durationInMs;
884        if (ret < 0) {
885            ret = Long.MAX_VALUE;
886        }
887        return ret;
888    }
889
890    protected boolean noScheduledOrRunningWork(String queueId) {
891        if (queueId != null) {
892            boolean ret = getQueueSize(queueId, null) == 0;
893            if (ret == false) {
894                if (log.isTraceEnabled()) {
895                    log.trace(queueId + " not empty, sched: " + getQueueSize(queueId, State.SCHEDULED) +
896                            ", running: " + getQueueSize(queueId, State.RUNNING));
897                }
898            }
899            return ret;
900        }
901        for (String id : getWorkQueueIds()) {
902            if (getQueueSize(id, null) > 0) {
903                if (log.isTraceEnabled()) {
904                    log.trace(id + " not empty, sched: " + getQueueSize(id, State.SCHEDULED) +
905                            ", running: " + getQueueSize(id, State.RUNNING));
906                }
907                return false;
908            }
909        }
910        return true;
911    }
912
913    @Override
914    public synchronized void clearCompletedWork(String queueId) {
915        queuing.clearCompletedWork(queueId, 0);
916    }
917
918    @Override
919    public synchronized void clearCompletedWork(long completionTime) {
920        for (String queueId : queuing.getCompletedQueueIds()) {
921            queuing.clearCompletedWork(queueId, completionTime);
922        }
923    }
924
925    @Override
926    public synchronized void cleanup() {
927        log.debug("Clearing old completed work");
928        for (String queueId : queuing.getCompletedQueueIds()) {
929            WorkQueueDescriptor workQueueDescriptor = workQueueDescriptors.get(queueId);
930            if (workQueueDescriptor == null) {
931                // unknown queue
932                continue;
933            }
934            long delay = workQueueDescriptor.getClearCompletedAfterSeconds() * 1000L;
935            if (delay > 0) {
936                long completionTime = System.currentTimeMillis() - delay;
937                queuing.clearCompletedWork(queueId, completionTime);
938            }
939        }
940    }
941
942    @Override
943    public boolean isStarted() {
944        return started && !shutdownInProgress;
945    }
946
947}