001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.work;
020
021import static java.lang.Math.min;
022import static org.nuxeo.ecm.core.work.api.WorkManager.Scheduling.CANCEL_SCHEDULED;
023
024import java.time.Duration;
025import java.util.Collections;
026import java.util.List;
027import java.util.concurrent.TimeUnit;
028
029import javax.naming.NamingException;
030import javax.transaction.RollbackException;
031import javax.transaction.Status;
032import javax.transaction.Synchronization;
033import javax.transaction.SystemException;
034import javax.transaction.Transaction;
035import javax.transaction.TransactionManager;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.nuxeo.ecm.core.event.EventServiceComponent;
040import org.nuxeo.ecm.core.work.api.Work;
041import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
042import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
043import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
044import org.nuxeo.lib.stream.codec.Codec;
045import org.nuxeo.lib.stream.computation.Record;
046import org.nuxeo.lib.stream.computation.Settings;
047import org.nuxeo.lib.stream.computation.StreamProcessor;
048import org.nuxeo.lib.stream.computation.Topology;
049import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
050import org.nuxeo.lib.stream.log.LogAppender;
051import org.nuxeo.lib.stream.log.LogLag;
052import org.nuxeo.lib.stream.log.LogManager;
053import org.nuxeo.lib.stream.log.LogOffset;
054import org.nuxeo.runtime.api.Framework;
055import org.nuxeo.runtime.codec.CodecService;
056import org.nuxeo.runtime.metrics.NuxeoMetricSet;
057import org.nuxeo.runtime.model.ComponentContext;
058import org.nuxeo.runtime.model.ComponentManager;
059import org.nuxeo.runtime.model.Descriptor;
060import org.nuxeo.runtime.services.config.ConfigurationService;
061import org.nuxeo.runtime.stream.StreamService;
062import org.nuxeo.runtime.transaction.TransactionHelper;
063
064import com.codahale.metrics.MetricRegistry;
065
066/**
067 * WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed
068 * for performance reason.
069 *
070 * @since 9.3
071 */
072public class StreamWorkManager extends WorkManagerImpl {
073
074    protected static final Log log = LogFactory.getLog(StreamWorkManager.class);
075
076    public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config";
077
078    public static final String DEFAULT_WORK_LOG_CONFIG = "work";
079
080    public static final String WORK_CODEC_PROP = "nuxeo.stream.work.log.codec";
081
082    public static final String DEFAULT_WORK_CODEC = "legacy";
083
084    public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor";
085
086    public static final String DEFAULT_WORK_OVER_PROVISIONING = "3";
087
088    public static final int DEFAULT_CONCURRENCY = 4;
089
090    /**
091     * @since 10.2
092     */
093    public static final String STATETTL_KEY = "nuxeo.stream.work.state.ttl.seconds";
094
095    /**
096     * @since 10.2
097     */
098    public static final String STORESTATE_KEY = "nuxeo.stream.work.storestate.enabled";
099
100    /**
101     * @since 10.2
102     */
103    public static final String STATETTL_DEFAULT_VALUE = "3600";
104
105    protected Topology topology;
106
107    protected Settings settings;
108
109    protected StreamProcessor streamProcessor;
110
111    protected LogManager logManager;
112
113    protected boolean storeState;
114
115    protected long stateTTL;
116
117    protected int getOverProvisioningFactor() {
118        // Enable over provisioning only if the log can be distributed
119        if (getLogManager().supportSubscribe()) {
120            return Integer.parseInt(Framework.getProperty(WORK_OVER_PROVISIONING_PROP, DEFAULT_WORK_OVER_PROVISIONING));
121        }
122        return 1;
123    }
124
125    protected String getCodecName() {
126        return Framework.getProperty(WORK_CODEC_PROP, DEFAULT_WORK_CODEC);
127    }
128
129    protected Codec<Record> getCodec() {
130        return Framework.getService(CodecService.class).getCodec(getCodecName(), Record.class);
131    }
132
133    @Override
134    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
135        String queueId = getCategoryQueueId(work.getCategory());
136        if (log.isDebugEnabled()) {
137            log.debug(String.format(
138                    "Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s",
139                    work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work));
140        }
141        if (!isQueuingEnabled(queueId)) {
142            log.info("Queue disabled, scheduling canceled: " + queueId);
143            return;
144        }
145        if (CANCEL_SCHEDULED.equals(scheduling)) {
146            if (storeState) {
147                if (WorkStateHelper.getState(work.getId()) != null) {
148                    WorkStateHelper.setCanceled(work.getId());
149                }
150            } else {
151                log.warn(String.format("Canceling a work is only supported if '%s' is true. Skipping work: %s",
152                        STORESTATE_KEY, work));
153            }
154            return;
155        }
156        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
157            return;
158        }
159        WorkSchedulePath.newInstance(work);
160        // We don't need to set a codec because appender is initialized with proper codec during processor init
161        LogAppender<Record> appender = logManager.getAppender(queueId);
162        if (appender == null) {
163            log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(),
164                    queueId));
165            return;
166        }
167        String key = work.getPartitionKey();
168        LogOffset offset = appender.append(key, Record.of(key, WorkComputation.serialize(work)));
169        if (work.isCoalescing()) {
170            WorkStateHelper.setLastOffset(work.getId(), offset.offset(), stateTTL);
171        }
172        if (storeState) {
173            WorkStateHelper.setState(work.getId(), Work.State.SCHEDULED, stateTTL);
174        }
175    }
176
177    @Override
178    public int getApplicationStartedOrder() {
179        // start before the WorkManagerImpl
180        return EventServiceComponent.APPLICATION_STARTED_ORDER - 2;
181    }
182
183    @Override
184    public void start(ComponentContext context) {
185        super.start(context);
186        ConfigurationService configuration = Framework.getService(ConfigurationService.class);
187        storeState = configuration.isBooleanPropertyTrue(STORESTATE_KEY);
188        stateTTL = Long.parseLong(configuration.getProperty(STATETTL_KEY, STATETTL_DEFAULT_VALUE));
189    }
190
191    @Override
192    public void init() {
193        if (started) {
194            return;
195        }
196        WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service");
197        wmi.active = false;
198        log.debug("Initializing");
199        synchronized (this) {
200            if (started) {
201                return;
202            }
203            getDescriptors(QUEUES_EP).forEach(d -> categoryToQueueId.put(d.getId(), d.getId()));
204            index();
205            initTopology();
206            logManager = getLogManager();
207            streamProcessor = new LogStreamProcessor(logManager);
208            streamProcessor.init(topology, settings);
209            started = true;
210            new ComponentListener().install();
211            log.info("Initialized");
212        }
213    }
214
215    class ComponentListener implements ComponentManager.Listener {
216        @Override
217        public void beforeStop(ComponentManager mgr, boolean isStandby) {
218            if (!shutdown(10, TimeUnit.SECONDS)) {
219                log.error("Some processors are still active");
220            }
221        }
222
223        @Override
224        public void afterStart(ComponentManager mgr, boolean isResume) {
225            streamProcessor.start();
226            for (Descriptor d : getDescriptors(QUEUES_EP)) {
227                activateQueueMetrics(d.getId());
228            }
229        }
230
231        @Override
232        public void afterStop(ComponentManager mgr, boolean isStandby) {
233            Framework.getRuntime().getComponentManager().removeListener(this);
234            for (Descriptor d : getDescriptors(QUEUES_EP)) {
235                deactivateQueueMetrics(d.getId());
236            }
237        }
238    }
239
240    protected LogManager getLogManager() {
241        String config = getLogConfig();
242        log.info("Init StreamWorkManager with Log configuration: " + config);
243        StreamService service = Framework.getService(StreamService.class);
244        return service.getLogManager(getLogConfig());
245    }
246
247    protected String getLogConfig() {
248        return Framework.getProperty(WORK_LOG_CONFIG_PROP, DEFAULT_WORK_LOG_CONFIG);
249    }
250
251    @Override
252    public boolean isProcessingEnabled(String queueId) {
253        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
254        return wqd != null && wqd.isProcessingEnabled();
255    }
256
257    protected void initTopology() {
258        // create a single topology with one root per work pool
259        Topology.Builder builder = Topology.builder();
260        List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP);
261        descriptors.stream().filter(WorkQueueDescriptor::isProcessingEnabled).forEach(d -> builder.addComputation(
262                () -> new WorkComputation(d.getId()), Collections.singletonList("i1:" + d.getId())));
263        topology = builder.build();
264        settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY), getCodec());
265        descriptors.forEach(item -> settings.setConcurrency(item.getId(), item.getMaxThreads()));
266        descriptors.forEach(item -> settings.setPartitions(item.getId(), getPartitions(item.getMaxThreads())));
267    }
268
269    protected int getPartitions(int maxThreads) {
270        if (maxThreads == 1) {
271            // when the pool size is one the we don't want any concurrency
272            return 1;
273        }
274        return getOverProvisioningFactor() * maxThreads;
275    }
276
277    public class WorkScheduling implements Synchronization {
278        public final Work work;
279
280        public final Scheduling scheduling;
281
282        public WorkScheduling(Work work, Scheduling scheduling) {
283            this.work = work;
284            this.scheduling = scheduling;
285        }
286
287        @Override
288        public void beforeCompletion() {
289        }
290
291        @Override
292        public void afterCompletion(int status) {
293            if (status == Status.STATUS_COMMITTED) {
294                StreamWorkManager.this.schedule(work, scheduling, false);
295            } else {
296                if (status != Status.STATUS_ROLLEDBACK) {
297                    throw new IllegalArgumentException("Unsupported transaction status " + status);
298                }
299            }
300
301        }
302    }
303
304    @Override
305    void activateQueue(WorkQueueDescriptor config) {
306        // queue processing is activated only from component listener afterStart
307        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
308            throw new IllegalArgumentException("cannot activate all queues");
309        }
310        log.info("Activated queue " + config.id + " " + config.toString());
311        if (config.isProcessingEnabled()) {
312            activateQueueMetrics(config.id);
313        }
314    }
315
316    @Override
317    void deactivateQueue(WorkQueueDescriptor config) {
318        // queue processing is deactivated only on shutdown
319        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
320            throw new IllegalArgumentException("cannot deactivate all queues");
321        }
322        if (config.isProcessingEnabled()) {
323            deactivateQueueMetrics(config.id);
324        }
325        log.info("Deactivated work queue not supported: " + config.id);
326    }
327
328    @Override
329    protected void activateQueueMetrics(String queueId) {
330        NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId);
331        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).scheduled, "scheduled");
332        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).running, "running");
333        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).completed, "completed");
334        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).canceled, "canceled");
335        registry.registerAll(queueMetrics);
336    }
337
338    @Override
339    protected void deactivateQueueMetrics(String queueId) {
340        String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId);
341        registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName));
342    }
343
344    @Override
345    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) {
346        log.warn("Shutdown a queue is not supported with computation implementation");
347        return false;
348    }
349
350    @Override
351    public boolean shutdown(long timeout, TimeUnit timeUnit) {
352        log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms");
353        shutdownInProgress = true;
354        try {
355            long shutdownDelay = Long.parseLong(Framework.getService(ConfigurationService.class)
356                                                         .getProperty(SHUTDOWN_DELAY_MS_KEY, "0"));
357            boolean ret = streamProcessor.stop(Duration.ofMillis(Math.max(timeUnit.toMillis(timeout), shutdownDelay)));
358            if (!ret) {
359                log.error("Not able to stop worker pool within the timeout.");
360            }
361            return ret;
362        } finally {
363            shutdownInProgress = false;
364        }
365    }
366
367    @Override
368    public int getQueueSize(String queueId, Work.State state) {
369        switch (state) {
370            case SCHEDULED:
371                return getMetrics(queueId).getScheduled().intValue();
372            case RUNNING:
373                return getMetrics(queueId).getRunning().intValue();
374            default:
375                return 0;
376        }
377    }
378
379    protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) {
380        // JMX threads have distinct class loader that need to be changed to get metrics
381        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
382        try {
383            Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader());
384            return getMetrics(queueId);
385        } finally {
386            Thread.currentThread().setContextClassLoader(classLoader);
387        }
388    }
389
390    @Override
391    public WorkQueueMetrics getMetrics(String queueId) {
392        LogLag lag = logManager.getLag(queueId, queueId);
393        long running = 0;
394        if (lag.lag() > 0) {
395            // we don't have the exact running metric
396            // give an approximation that can be higher that actual one because of the over provisioning
397            running = min(lag.lag(), settings.getPartitions(queueId));
398        }
399        return new WorkQueueMetrics(queueId, lag.lag(), running, lag.lower(), 0);
400    }
401
402    @Override
403    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
404        if (queueId != null) {
405            return awaitCompletionOnQueue(queueId, duration, unit);
406        }
407        for (Descriptor item : getDescriptors(QUEUES_EP)) {
408            if (!awaitCompletionOnQueue(item.getId(), duration, unit)) {
409                return false;
410            }
411        }
412        return true;
413    }
414
415    protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException {
416        if (!isStarted()) {
417            return true;
418        }
419        log.debug("awaitCompletion " + queueId + " starting");
420        // wait for the lag to be null
421        long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow
422        long deadline = System.currentTimeMillis() + durationMs;
423        while (System.currentTimeMillis() < deadline) {
424            Thread.sleep(100);
425            int lag = getMetrics(queueId).getScheduled().intValue();
426            if (lag == 0) {
427                if (log.isDebugEnabled()) {
428                    log.debug("awaitCompletion for " + queueId + " completed " + getMetrics(queueId));
429                }
430                return true;
431            }
432            if (!log.isDebugEnabled()) {
433                log.debug("awaitCompletion for " + queueId + " not completed " + getMetrics(queueId));
434            }
435        }
436        log.warn(String.format("%s timeout after: %.2fs, %s", queueId, durationMs / 1000.0, getMetrics(queueId)));
437        return false;
438    }
439
440    /**
441     * @deprecated since 10.2 because unused
442     */
443    @Deprecated
444    public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit)
445            throws InterruptedException {
446        if (!isStarted()) {
447            return true;
448        }
449        // wait that the low watermark get stable
450        long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow
451        long deadline = System.currentTimeMillis() + durationMs;
452        long lowWatermark = getLowWaterMark(queueId);
453        while (System.currentTimeMillis() < deadline) {
454            Thread.sleep(100);
455            long wm = getLowWaterMark(queueId);
456            if (wm == lowWatermark) {
457                log.debug("awaitCompletion for " + (queueId == null ? "all" : queueId) + " completed " + wm);
458                return true;
459            }
460            if (log.isDebugEnabled()) {
461                log.debug("awaitCompletion low wm  for " + (queueId == null ? "all" : queueId) + ":" + wm + " diff: "
462                        + (wm - lowWatermark));
463            }
464            lowWatermark = wm;
465        }
466        log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0));
467        return false;
468    }
469
470    protected long getLowWaterMark(String queueId) {
471        if (queueId != null) {
472            return streamProcessor.getLowWatermark(queueId);
473        }
474        return streamProcessor.getLowWatermark();
475    }
476
477    @Override
478    public Work.State getWorkState(String workId) {
479        if (!storeState) {
480            return null;
481        }
482        return WorkStateHelper.getState(workId);
483    }
484
485    @Override
486    public Work find(String s, Work.State state) {
487        // always not found
488        return null;
489    }
490
491    @Override
492    public List<Work> listWork(String s, Work.State state) {
493        return Collections.emptyList();
494    }
495
496    @Override
497    public List<String> listWorkIds(String s, Work.State state) {
498        return Collections.emptyList();
499    }
500
501    @Override
502    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
503        TransactionManager transactionManager;
504        try {
505            transactionManager = TransactionHelper.lookupTransactionManager();
506        } catch (NamingException e) {
507            transactionManager = null;
508        }
509        if (transactionManager == null) {
510            log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId());
511            return false;
512        }
513        try {
514            Transaction transaction = transactionManager.getTransaction();
515            if (transaction == null) {
516                if (log.isDebugEnabled()) {
517                    log.debug("Not scheduled work after commit because of missing transaction: " + work.getId());
518                }
519                return false;
520            }
521            int status = transaction.getStatus();
522            if (status == Status.STATUS_ACTIVE) {
523                if (log.isDebugEnabled()) {
524                    log.debug("Scheduled after commit: " + work.getId());
525                }
526                transaction.registerSynchronization(new StreamWorkManager.WorkScheduling(work, scheduling));
527                return true;
528            } else if (status == Status.STATUS_COMMITTED) {
529                // called in afterCompletion, we can schedule immediately
530                if (log.isDebugEnabled()) {
531                    log.debug("Scheduled immediately: " + work.getId());
532                }
533                return false;
534            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
535                if (log.isDebugEnabled()) {
536                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId());
537                }
538                return true;
539            } else {
540                if (log.isDebugEnabled()) {
541                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
542                            + work.getId());
543                }
544                return false;
545            }
546        } catch (SystemException | RollbackException e) {
547            log.error("Cannot schedule after commit", e);
548            return false;
549        }
550    }
551
552    @Override
553    public boolean supportsProcessingDisabling() {
554        return true;
555    }
556
557}