001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.work;
020
021import static java.lang.Math.min;
022import static org.nuxeo.ecm.core.work.BaseOverflowRecordFilter.PREFIX_OPTION;
023import static org.nuxeo.ecm.core.work.BaseOverflowRecordFilter.STORE_NAME_OPTION;
024import static org.nuxeo.ecm.core.work.BaseOverflowRecordFilter.STORE_TTL_OPTION;
025import static org.nuxeo.ecm.core.work.BaseOverflowRecordFilter.THRESHOLD_SIZE_OPTION;
026import static org.nuxeo.ecm.core.work.api.WorkManager.Scheduling.CANCEL_SCHEDULED;
027import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1;
028
029import java.time.Duration;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.concurrent.TimeUnit;
035import java.util.function.Predicate;
036
037import javax.naming.NamingException;
038import javax.transaction.RollbackException;
039import javax.transaction.Status;
040import javax.transaction.Synchronization;
041import javax.transaction.SystemException;
042import javax.transaction.Transaction;
043import javax.transaction.TransactionManager;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.nuxeo.ecm.core.event.EventServiceComponent;
048import org.nuxeo.ecm.core.work.api.Work;
049import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
050import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
051import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
052import org.nuxeo.lib.stream.codec.Codec;
053import org.nuxeo.lib.stream.computation.ComputationPolicy;
054import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
055import org.nuxeo.lib.stream.computation.Record;
056import org.nuxeo.lib.stream.computation.RecordFilter;
057import org.nuxeo.lib.stream.computation.RecordFilterChain;
058import org.nuxeo.lib.stream.computation.Settings;
059import org.nuxeo.lib.stream.computation.StreamManager;
060import org.nuxeo.lib.stream.computation.StreamProcessor;
061import org.nuxeo.lib.stream.computation.Topology;
062import org.nuxeo.lib.stream.computation.internals.RecordFilterChainImpl;
063import org.nuxeo.lib.stream.log.LogLag;
064import org.nuxeo.lib.stream.log.LogManager;
065import org.nuxeo.lib.stream.log.LogOffset;
066import org.nuxeo.lib.stream.log.Name;
067import org.nuxeo.runtime.api.Framework;
068import org.nuxeo.runtime.codec.CodecService;
069import org.nuxeo.runtime.metrics.NuxeoMetricSet;
070import org.nuxeo.runtime.model.ComponentContext;
071import org.nuxeo.runtime.model.ComponentManager;
072import org.nuxeo.runtime.model.Descriptor;
073import org.nuxeo.runtime.services.config.ConfigurationService;
074import org.nuxeo.runtime.stream.StreamService;
075import org.nuxeo.runtime.transaction.TransactionHelper;
076
077import io.dropwizard.metrics5.MetricName;
078
079/**
080 * WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed
081 * for performance reason.
082 *
083 * @since 9.3
084 */
085public class StreamWorkManager extends WorkManagerImpl {
086
087    protected static final Log log = LogFactory.getLog(StreamWorkManager.class);
088
089    public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config";
090
091    public static final String DEFAULT_WORK_LOG_CONFIG = "work";
092
093    public static final String WORK_CODEC_PROP = "nuxeo.stream.work.log.codec";
094
095    public static final String DEFAULT_WORK_CODEC = "legacy";
096
097    public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor";
098
099    public static final String DEFAULT_WORK_OVER_PROVISIONING = "3";
100
101    public static final int DEFAULT_CONCURRENCY = 4;
102
103    // @since 11.1
104    protected WorkQueueMetrics lastMetrics;
105
106    protected long lastMetricTime;
107
108    protected long CACHE_LAST_METRIC_DURATION_MS = 1000;
109
110    // @since 11.1
111    public static final String NAMESPACE_PREFIX = "work/";
112
113    /**
114     * @since 10.2
115     */
116    public static final String STATETTL_KEY = "nuxeo.stream.work.state.ttl.seconds";
117
118    /**
119     * @since 10.2
120     */
121    public static final String STORESTATE_KEY = "nuxeo.stream.work.storestate.enabled";
122
123    /**
124     * @since 10.2
125     */
126    public static final long STATETTL_DEFAULT_VALUE = 3600;
127
128    /**
129     * @since 11.1
130     */
131    public static final String COMPUTATION_FILTER_CLASS_KEY = "nuxeo.stream.work.computation.filter.class";
132
133    /**
134     * @since 11.1
135     */
136    public static final String COMPUTATION_FILTER_STORE_KEY = "nuxeo.stream.work.computation.filter.storeName";
137
138    /**
139     * @since 11.1
140     */
141    public static final String COMPUTATION_FILTER_STORE_TTL_KEY = "nuxeo.stream.work.computation.filter.storeTTL";
142
143    /**
144     * @since 11.1
145     */
146    public static final String COMPUTATION_FILTER_THRESHOLD_SIZE_KEY = "nuxeo.stream.work.computation.filter.thresholdSize";
147
148    /**
149     * @since 11.1
150     */
151    public static final String COMPUTATION_FILTER_PREFIX_KEY = "nuxeo.stream.work.computation.filter.storeKeyPrefix";
152
153    protected Topology topology;
154
155    protected Topology topologyDisabled;
156
157    protected Settings settings;
158
159    protected StreamProcessor streamProcessor;
160
161    protected LogManager logManager;
162
163    protected StreamManager streamManager;
164
165    protected boolean storeState;
166
167    protected long stateTTL;
168
169    protected int getOverProvisioningFactor() {
170        // Enable over provisioning only if the log can be distributed
171        if (getLogManager().supportSubscribe()) {
172            return Integer.parseInt(Framework.getProperty(WORK_OVER_PROVISIONING_PROP, DEFAULT_WORK_OVER_PROVISIONING));
173        }
174        return 1;
175    }
176
177    protected String getCodecName() {
178        return Framework.getProperty(WORK_CODEC_PROP, DEFAULT_WORK_CODEC);
179    }
180
181    protected Codec<Record> getCodec() {
182        return Framework.getService(CodecService.class).getCodec(getCodecName(), Record.class);
183    }
184
185    @Override
186    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
187        String queueId = getCategoryQueueId(work.getCategory());
188        if (log.isDebugEnabled()) {
189            log.debug(String.format(
190                    "Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s",
191                    work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work));
192        }
193        if (!isQueuingEnabled(queueId)) {
194            log.info("Queue disabled, scheduling canceled: " + queueId);
195            return;
196        }
197        if (CANCEL_SCHEDULED.equals(scheduling)) {
198            if (storeState) {
199                if (WorkStateHelper.getState(work.getId()) != null) {
200                    WorkStateHelper.setCanceled(work.getId());
201                }
202            } else {
203                log.warn(String.format("Canceling a work is only supported if '%s' is true. Skipping work: %s",
204                        STORESTATE_KEY, work));
205            }
206            return;
207        }
208        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
209            return;
210        }
211        WorkSchedulePath.newInstance(work);
212        String key = work.getPartitionKey();
213        LogOffset offset;
214        try {
215            offset = streamManager.append(NAMESPACE_PREFIX + queueId, Record.of(key, WorkComputation.serialize(work)));
216        } catch (IllegalArgumentException e) {
217            log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(),
218                    NAMESPACE_PREFIX + queueId));
219            return;
220        }
221        if (work.isCoalescing()) {
222            WorkStateHelper.setLastOffset(work.getId(), offset.offset(), stateTTL);
223        }
224        if (work.isGroupJoin()) {
225            if (log.isDebugEnabled()) {
226                log.debug(String.format("Submit Work: %s to GroupJoin: %s, offset: %s", work.getId(),
227                        work.getPartitionKey(), offset));
228            }
229            WorkStateHelper.addGroupJoinWork(work.getPartitionKey());
230        }
231        if (storeState) {
232            WorkStateHelper.setState(work.getId(), Work.State.SCHEDULED, stateTTL);
233        }
234    }
235
236    @Override
237    public int getApplicationStartedOrder() {
238        // start before the WorkManagerImpl
239        return EventServiceComponent.APPLICATION_STARTED_ORDER - 2;
240    }
241
242    @Override
243    public void start(ComponentContext context) {
244        super.start(context);
245        ConfigurationService configuration = Framework.getService(ConfigurationService.class);
246        storeState = configuration.isBooleanTrue(STORESTATE_KEY);
247        stateTTL = configuration.getLong(STATETTL_KEY, STATETTL_DEFAULT_VALUE);
248    }
249
250    protected RecordFilterChain getRecordFilter() {
251        String filterClass = getRecordFilterClass();
252        if (filterClass == null) {
253            return null;
254        }
255        RecordFilterChain filter = new RecordFilterChainImpl();
256        Class<? extends RecordFilter> klass;
257        try {
258            klass = (Class<RecordFilter>) Class.forName(filterClass);
259            if (!RecordFilter.class.isAssignableFrom(klass)) {
260                throw new IllegalArgumentException("Invalid class for RecordFilter: " + filterClass);
261            }
262            RecordFilter ret = klass.getDeclaredConstructor().newInstance();
263            ret.init(getRecordFilterOptions());
264            filter.addFilter(ret);
265        } catch (ReflectiveOperationException e) {
266            throw new IllegalArgumentException("Invalid class for RecordFilter: " + filterClass, e);
267        }
268        return filter;
269    }
270
271    protected Map<String, String> getRecordFilterOptions() {
272        Map<String, String> ret = new HashMap<>();
273        ConfigurationService configuration = Framework.getService(ConfigurationService.class);
274        configuration.getString(COMPUTATION_FILTER_STORE_KEY).ifPresent(value -> ret.put(STORE_NAME_OPTION, value));
275        configuration.getString(COMPUTATION_FILTER_PREFIX_KEY).ifPresent(value -> ret.put(PREFIX_OPTION, value));
276        configuration.getInteger(COMPUTATION_FILTER_THRESHOLD_SIZE_KEY)
277                     .ifPresent(value -> ret.put(THRESHOLD_SIZE_OPTION, value.toString()));
278        configuration.getString(COMPUTATION_FILTER_STORE_TTL_KEY).ifPresent(value -> ret.put(STORE_TTL_OPTION, value));
279        return ret;
280    }
281
282    protected String getRecordFilterClass() {
283        ConfigurationService configuration = Framework.getService(ConfigurationService.class);
284        return configuration.getString(COMPUTATION_FILTER_CLASS_KEY).orElse(null);
285    }
286
287    @Override
288    public void init() {
289        if (started) {
290            return;
291        }
292        WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service");
293        wmi.active = false;
294        log.debug("Initializing");
295        synchronized (this) {
296            if (started) {
297                return;
298            }
299            getDescriptors(QUEUES_EP).forEach(d -> categoryToQueueId.put(d.getId(), d.getId()));
300            index();
301            initTopology();
302            logManager = getLogManager();
303            streamManager = getStreamManager();
304            streamManager.register("StreamWorkManagerDisable", topologyDisabled, settings);
305            streamProcessor = streamManager.registerAndCreateProcessor("StreamWorkManager", topology, settings);
306            started = true;
307            new ComponentListener().install();
308            log.info("Initialized");
309        }
310    }
311
312    class ComponentListener implements ComponentManager.Listener {
313        @Override
314        public void beforeStop(ComponentManager mgr, boolean isStandby) {
315            if (!shutdown(10, TimeUnit.SECONDS)) {
316                log.error("Some processors are still active");
317            }
318        }
319
320        @Override
321        public void afterStart(ComponentManager mgr, boolean isResume) {
322            if (isProcessingDisabled()) {
323                log.warn("WorkManager processing has been disabled on this node");
324                return;
325            }
326            streamProcessor.start();
327            for (Descriptor d : getDescriptors(QUEUES_EP)) {
328                activateQueueMetrics(d.getId());
329            }
330        }
331
332        @Override
333        public void afterStop(ComponentManager mgr, boolean isStandby) {
334            Framework.getRuntime().getComponentManager().removeListener(this);
335            for (Descriptor d : getDescriptors(QUEUES_EP)) {
336                deactivateQueueMetrics(d.getId());
337            }
338        }
339    }
340
341    protected LogManager getLogManager() {
342        String config = getLogConfig();
343        log.info("Init StreamWorkManager with Log configuration: " + config);
344        StreamService service = Framework.getService(StreamService.class);
345        return service.getLogManager();
346    }
347
348    protected StreamManager getStreamManager() {
349        StreamService service = Framework.getService(StreamService.class);
350        return service.getStreamManager();
351    }
352
353    protected String getLogConfig() {
354        return Framework.getProperty(WORK_LOG_CONFIG_PROP, DEFAULT_WORK_LOG_CONFIG);
355    }
356
357    @Override
358    public boolean isProcessingEnabled(String queueId) {
359        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
360        return wqd != null && wqd.isProcessingEnabled();
361    }
362
363    protected void initTopology() {
364        List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
365        // create the single topology with one root per work pool
366        Topology.Builder builder = Topology.builder();
367        descriptors.stream().filter(WorkQueueDescriptor::isProcessingEnabled).forEach(d -> builder.addComputation(
368                           () -> new WorkComputation(NAMESPACE_PREFIX + d.getId()),
369                           Collections.singletonList(INPUT_1 + ":" + NAMESPACE_PREFIX + d.getId())));
370        topology = builder.build();
371        // create a topology for the disabled work pools in order to init their input streams
372        Topology.Builder builderDisabled = Topology.builder();
373        descriptors.stream()
374                   .filter(Predicate.not(WorkQueueDescriptor::isProcessingEnabled))
375                   .forEach(d -> builderDisabled.addComputation(() -> new WorkComputation(d.getId()),
376                           Collections.singletonList(INPUT_1 + ":" + NAMESPACE_PREFIX + d.getId())));
377        topologyDisabled = builderDisabled.build();
378        // The retry policy is handled at AbstractWork level, but we want to skip failure
379        ComputationPolicy policy = new ComputationPolicyBuilder().continueOnFailure(true).build();
380        RecordFilterChain filter = getRecordFilter();
381        settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY), getCodec(), policy, filter);
382        descriptors.forEach(item -> settings.setConcurrency(item.getId(), item.getMaxThreads()));
383        descriptors.forEach(item -> settings.setPartitions(item.getId(), getPartitions(item.getMaxThreads())));
384    }
385
386    protected int getPartitions(int maxThreads) {
387        if (maxThreads == 1) {
388            // when the pool size is one the we don't want any concurrency
389            return 1;
390        }
391        return getOverProvisioningFactor() * maxThreads;
392    }
393
394    public class WorkScheduling implements Synchronization {
395        public final Work work;
396
397        public final Scheduling scheduling;
398
399        public WorkScheduling(Work work, Scheduling scheduling) {
400            this.work = work;
401            this.scheduling = scheduling;
402        }
403
404        @Override
405        public void beforeCompletion() {
406        }
407
408        @Override
409        public void afterCompletion(int status) {
410            if (status == Status.STATUS_COMMITTED) {
411                StreamWorkManager.this.schedule(work, scheduling, false);
412            } else {
413                if (status != Status.STATUS_ROLLEDBACK) {
414                    throw new IllegalArgumentException("Unsupported transaction status " + status);
415                }
416            }
417
418        }
419    }
420
421    @Override
422    void activateQueue(WorkQueueDescriptor config) {
423        // queue processing is activated only from component listener afterStart
424        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
425            throw new IllegalArgumentException("cannot activate all queues");
426        }
427        log.info("Activated queue " + config.id + " " + config.toString());
428        if (config.isProcessingEnabled()) {
429            activateQueueMetrics(config.id);
430        }
431    }
432
433    @Override
434    void deactivateQueue(WorkQueueDescriptor config) {
435        // queue processing is deactivated only on shutdown
436        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
437            throw new IllegalArgumentException("cannot deactivate all queues");
438        }
439        if (config.isProcessingEnabled()) {
440            deactivateQueueMetrics(config.id);
441        }
442        log.info("Deactivated work queue not supported: " + config.id);
443    }
444
445    @Override
446    protected void activateQueueMetrics(String queueId) {
447        NuxeoMetricSet queueMetrics = new NuxeoMetricSet(MetricName.build("nuxeo.works.global.queue").tagged("queue", queueId));
448        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).scheduled, "scheduled");
449        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).running, "running");
450        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).completed, "completed");
451        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).canceled, "canceled");
452        registry.registerAll(queueMetrics);
453    }
454
455    @Override
456    protected void deactivateQueueMetrics(String queueId) {
457        String queueMetricsName = MetricName.build("nuxeo.works.global.queue").tagged("queue", queueId).getKey();
458        registry.removeMatching((name, metric) -> name.getKey().startsWith(queueMetricsName));
459    }
460
461    @Override
462    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) {
463        log.warn("Shutdown a queue is not supported with computation implementation");
464        return false;
465    }
466
467    @Override
468    public boolean shutdown(long timeout, TimeUnit timeUnit) {
469        log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms");
470        shutdownInProgress = true;
471        try {
472            long shutdownDelay = Framework.getService(ConfigurationService.class).getLong(SHUTDOWN_DELAY_MS_KEY, 0);
473            boolean ret = streamProcessor.stop(Duration.ofMillis(Math.max(timeUnit.toMillis(timeout), shutdownDelay)));
474            if (!ret) {
475                log.error("Not able to stop worker pool within the timeout.");
476            }
477            return ret;
478        } finally {
479            shutdownInProgress = false;
480        }
481    }
482
483    @Override
484    public int getQueueSize(String queueId, Work.State state) {
485        switch (state) {
486            case SCHEDULED:
487                return getMetrics(queueId).getScheduled().intValue();
488            case RUNNING:
489                return getMetrics(queueId).getRunning().intValue();
490            default:
491                return 0;
492        }
493    }
494
495    protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) {
496        long now = System.currentTimeMillis();
497        if (lastMetrics != null && lastMetrics.queueId == queueId
498                && (now - lastMetricTime) < CACHE_LAST_METRIC_DURATION_MS) {
499            return lastMetrics;
500        }
501        // JMX threads have distinct class loader that need to be changed to get metrics
502        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
503        try {
504            Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader());
505            lastMetrics = getMetrics(queueId);
506            lastMetricTime = System.currentTimeMillis();
507            return lastMetrics;
508        } finally {
509            Thread.currentThread().setContextClassLoader(classLoader);
510        }
511    }
512
513    @Override
514    public WorkQueueMetrics getMetrics(String queueId) {
515        LogLag lag = logManager.getLag(Name.ofUrn(NAMESPACE_PREFIX + queueId), Name.ofUrn(NAMESPACE_PREFIX + queueId));
516        long running = 0;
517        if (lag.lag() > 0) {
518            // we don't have the exact running metric
519            // give an approximation that can be higher that actual one because of the over provisioning
520            running = min(lag.lag(), settings.getPartitions(queueId));
521        }
522        return new WorkQueueMetrics(queueId, lag.lag(), running, lag.lower(), 0);
523    }
524
525    @Override
526    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
527        if (queueId != null) {
528            return awaitCompletionOnQueue(queueId, duration, unit);
529        }
530        for (Descriptor item : getDescriptors(QUEUES_EP)) {
531            if (!awaitCompletionOnQueue(item.getId(), duration, unit)) {
532                return false;
533            }
534        }
535        return true;
536    }
537
538    protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException {
539        if (!isStarted()) {
540            return true;
541        }
542        log.debug("awaitCompletion " + queueId + " starting");
543        // wait for the lag to be null
544        long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow
545        long deadline = System.currentTimeMillis() + durationMs;
546        while (System.currentTimeMillis() < deadline) {
547            Thread.sleep(100);
548            int lag = getMetrics(queueId).getScheduled().intValue();
549            if (lag == 0) {
550                if (log.isDebugEnabled()) {
551                    log.debug("awaitCompletion for " + queueId + " completed " + getMetrics(queueId));
552                }
553                return true;
554            }
555            if (!log.isDebugEnabled()) {
556                log.debug("awaitCompletion for " + queueId + " not completed " + getMetrics(queueId));
557            }
558        }
559        log.warn(String.format("%s timeout after: %.2fs, %s", queueId, durationMs / 1000.0, getMetrics(queueId)));
560        return false;
561    }
562
563    /**
564     * @deprecated since 10.2 because unused
565     */
566    @Deprecated
567    public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit)
568            throws InterruptedException {
569        if (!isStarted()) {
570            return true;
571        }
572        // wait that the low watermark get stable
573        long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow
574        long deadline = System.currentTimeMillis() + durationMs;
575        long lowWatermark = getLowWaterMark(queueId);
576        while (System.currentTimeMillis() < deadline) {
577            Thread.sleep(100);
578            long wm = getLowWaterMark(queueId);
579            if (wm == lowWatermark) {
580                log.debug("awaitCompletion for " + (queueId == null ? "all" : queueId) + " completed " + wm);
581                return true;
582            }
583            if (log.isDebugEnabled()) {
584                log.debug("awaitCompletion low wm  for " + (queueId == null ? "all" : queueId) + ":" + wm + " diff: "
585                        + (wm - lowWatermark));
586            }
587            lowWatermark = wm;
588        }
589        log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0));
590        return false;
591    }
592
593    protected long getLowWaterMark(String queueId) {
594        if (queueId != null) {
595            return streamProcessor.getLowWatermark(queueId);
596        }
597        return streamProcessor.getLowWatermark();
598    }
599
600    @Override
601    public Work.State getWorkState(String workId) {
602        if (!storeState) {
603            return null;
604        }
605        return WorkStateHelper.getState(workId);
606    }
607
608    @Override
609    public Work find(String s, Work.State state) {
610        // always not found
611        return null;
612    }
613
614    @Override
615    public List<Work> listWork(String s, Work.State state) {
616        return Collections.emptyList();
617    }
618
619    @Override
620    public List<String> listWorkIds(String s, Work.State state) {
621        return Collections.emptyList();
622    }
623
624    @Override
625    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
626        TransactionManager transactionManager;
627        try {
628            transactionManager = TransactionHelper.lookupTransactionManager();
629        } catch (NamingException e) {
630            transactionManager = null;
631        }
632        if (transactionManager == null) {
633            log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId());
634            return false;
635        }
636        try {
637            Transaction transaction = transactionManager.getTransaction();
638            if (transaction == null) {
639                if (log.isDebugEnabled()) {
640                    log.debug("Not scheduled work after commit because of missing transaction: " + work.getId());
641                }
642                return false;
643            }
644            int status = transaction.getStatus();
645            if (status == Status.STATUS_ACTIVE) {
646                if (log.isDebugEnabled()) {
647                    log.debug("Scheduled after commit: " + work.getId());
648                }
649                transaction.registerSynchronization(new StreamWorkManager.WorkScheduling(work, scheduling));
650                return true;
651            } else if (status == Status.STATUS_COMMITTED) {
652                // called in afterCompletion, we can schedule immediately
653                if (log.isDebugEnabled()) {
654                    log.debug("Scheduled immediately: " + work.getId());
655                }
656                return false;
657            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
658                if (log.isDebugEnabled()) {
659                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId());
660                }
661                return true;
662            } else {
663                if (log.isDebugEnabled()) {
664                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
665                            + work.getId());
666                }
667                return false;
668            }
669        } catch (SystemException | RollbackException e) {
670            log.error("Cannot schedule after commit", e);
671            return false;
672        }
673    }
674
675    @Override
676    public boolean supportsProcessingDisabling() {
677        return true;
678    }
679
680}