001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.work;
020
021import static java.lang.Math.min;
022import static org.nuxeo.ecm.core.work.api.WorkManager.Scheduling.CANCEL_SCHEDULED;
023
024import java.lang.reflect.Field;
025import java.time.Duration;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.TimeUnit;
031
032import javax.naming.NamingException;
033import javax.transaction.RollbackException;
034import javax.transaction.Status;
035import javax.transaction.Synchronization;
036import javax.transaction.SystemException;
037import javax.transaction.Transaction;
038import javax.transaction.TransactionManager;
039
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.nuxeo.ecm.core.event.EventServiceComponent;
043import org.nuxeo.ecm.core.work.api.Work;
044import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
045import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
046import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
047import org.nuxeo.lib.stream.codec.Codec;
048import org.nuxeo.lib.stream.computation.Record;
049import org.nuxeo.lib.stream.computation.Settings;
050import org.nuxeo.lib.stream.computation.StreamProcessor;
051import org.nuxeo.lib.stream.computation.Topology;
052import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
053import org.nuxeo.lib.stream.log.LogAppender;
054import org.nuxeo.lib.stream.log.LogLag;
055import org.nuxeo.lib.stream.log.LogManager;
056import org.nuxeo.runtime.api.Framework;
057import org.nuxeo.runtime.codec.CodecService;
058import org.nuxeo.runtime.metrics.NuxeoMetricSet;
059import org.nuxeo.runtime.model.ComponentContext;
060import org.nuxeo.runtime.model.ComponentManager;
061import org.nuxeo.runtime.services.config.ConfigurationService;
062import org.nuxeo.runtime.stream.StreamService;
063import org.nuxeo.runtime.transaction.TransactionHelper;
064
065import com.codahale.metrics.MetricRegistry;
066
067/**
068 * WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed
069 * for performance reason.
070 *
071 * @since 9.3
072 */
073public class StreamWorkManager extends WorkManagerImpl {
074
075    protected static final Log log = LogFactory.getLog(StreamWorkManager.class);
076
077    public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config";
078
079    public static final String DEFAULT_WORK_LOG_CONFIG = "work";
080
081    public static final String WORK_CODEC_PROP = "nuxeo.stream.work.log.codec";
082
083    public static final String DEFAULT_WORK_CODEC = "legacy";
084
085    public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor";
086
087    public static final String DEFAULT_WORK_OVER_PROVISIONING = "3";
088
089    public static final int DEFAULT_CONCURRENCY = 4;
090
091    /**
092     * @since 10.2
093     */
094    public static final String STATETTL_KEY = "nuxeo.stream.work.state.ttl.seconds";
095
096    /**
097     * @since 10.2
098     */
099    public static final String STORESTATE_KEY = "nuxeo.stream.work.storestate.enabled";
100
101    /**
102     * @since 10.2
103     */
104    public static final String STATETTL_DEFAULT_VALUE = "3600";
105
106    protected Topology topology;
107
108    protected Settings settings;
109
110    protected StreamProcessor streamProcessor;
111
112    protected LogManager logManager;
113
114    protected final Set<String> streamIds = new HashSet<>();
115
116    protected boolean storeState;
117
118    protected long stateTTL;
119
120    protected int getOverProvisioningFactor() {
121        // Enable over provisioning only if the log can be distributed
122        if (getLogManager().supportSubscribe()) {
123            return Integer.parseInt(Framework.getProperty(WORK_OVER_PROVISIONING_PROP, DEFAULT_WORK_OVER_PROVISIONING));
124        }
125        return 1;
126    }
127
128    protected String getCodecName() {
129        return Framework.getProperty(WORK_CODEC_PROP, DEFAULT_WORK_CODEC);
130    }
131
132    protected Codec<Record> getCodec() {
133        return Framework.getService(CodecService.class).getCodec(getCodecName(), Record.class);
134    }
135
136    @Override
137    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
138        String queueId = getStreamForCategory(work.getCategory());
139        if (log.isDebugEnabled()) {
140            log.debug(String.format(
141                    "Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s",
142                    work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work));
143        }
144        if (!isQueuingEnabled(queueId)) {
145            log.info("Queue disabled, scheduling canceled: " + queueId);
146            return;
147        }
148        if (CANCEL_SCHEDULED.equals(scheduling)) {
149            if (storeState) {
150                if (WorkStateHelper.getState(work.getId()) != null) {
151                    WorkStateHelper.setCanceled(work.getId());
152                }
153            } else {
154                log.warn(String.format("Canceling a work is only supported if '%s' is true. Skipping work: %s",
155                        STORESTATE_KEY, work));
156            }
157            return;
158        }
159        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
160            return;
161        }
162        WorkSchedulePath.newInstance(work);
163        // We don't need to set a codec because appender is initialized with proper codec during processor init
164        LogAppender<Record> appender = logManager.getAppender(getStreamForCategory(work.getCategory()));
165        if (appender == null) {
166            log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(),
167                    getStreamForCategory(work.getCategory())));
168            return;
169        }
170        String key = work.getPartitionKey();
171        appender.append(key, Record.of(key, WorkComputation.serialize(work)));
172        if (storeState) {
173            WorkStateHelper.setState(work.getId(), Work.State.SCHEDULED, stateTTL);
174        }
175    }
176
177    protected String getStreamForCategory(String category) {
178        if (category != null && streamIds.contains(category)) {
179            return category;
180        }
181        return DEFAULT_CATEGORY;
182    }
183
184    @Override
185    public int getApplicationStartedOrder() {
186        // start before the WorkManagerImpl
187        return EventServiceComponent.APPLICATION_STARTED_ORDER - 2;
188    }
189
190    @Override
191    public void start(ComponentContext context) {
192        init();
193        ConfigurationService configuration = Framework.getService(ConfigurationService.class);
194        storeState = configuration.isBooleanPropertyTrue(STORESTATE_KEY);
195        stateTTL = Long.parseLong(configuration.getProperty(STATETTL_KEY, STATETTL_DEFAULT_VALUE));
196    }
197
198    @Override
199    public void init() {
200        if (started) {
201            return;
202        }
203        log.debug("Initializing");
204        synchronized (this) {
205            if (started) {
206                return;
207            }
208            supplantWorkManagerImpl();
209            workQueueConfig.index();
210            initTopology();
211            logManager = getLogManager();
212            streamProcessor = new LogStreamProcessor(logManager);
213            streamProcessor.init(topology, settings);
214            started = true;
215            new ComponentListener().install();
216            log.info("Initialized");
217        }
218    }
219
220    class ComponentListener implements ComponentManager.Listener {
221        @Override
222        public void beforeStop(ComponentManager mgr, boolean isStandby) {
223            if (!shutdown(10, TimeUnit.SECONDS)) {
224                log.error("Some processors are still active");
225            }
226        }
227
228        @Override
229        public void afterStart(ComponentManager mgr, boolean isResume) {
230            streamProcessor.start();
231            for (String id : workQueueConfig.getQueueIds()) {
232                activateQueueMetrics(id);
233            }
234        }
235
236        @Override
237        public void afterStop(ComponentManager mgr, boolean isStandby) {
238            Framework.getRuntime().getComponentManager().removeListener(this);
239            for (String id : workQueueConfig.getQueueIds()) {
240                deactivateQueueMetrics(id);
241            }
242        }
243    }
244
245    protected LogManager getLogManager() {
246        String config = getLogConfig();
247        log.info("Init StreamWorkManager with Log configuration: " + config);
248        StreamService service = Framework.getService(StreamService.class);
249        return service.getLogManager(getLogConfig());
250    }
251
252    protected String getLogConfig() {
253        return Framework.getProperty(WORK_LOG_CONFIG_PROP, DEFAULT_WORK_LOG_CONFIG);
254    }
255
256    @Override
257    public boolean isProcessingEnabled(String queueId) {
258        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
259        return wqd != null && wqd.isProcessingEnabled();
260    }
261
262    /**
263     * Hack to steal the WorkManagerImpl queue contributions.
264     */
265    protected void supplantWorkManagerImpl() {
266        WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service");
267        Class clazz = WorkManagerImpl.class;
268        Field workQueueConfigField;
269        try {
270            workQueueConfigField = clazz.getDeclaredField("workQueueConfig");
271        } catch (NoSuchFieldException e) {
272            throw new RuntimeException(e);
273        }
274        workQueueConfigField.setAccessible(true);
275        final WorkQueueRegistry wqr;
276        try {
277            wqr = (WorkQueueRegistry) workQueueConfigField.get(wmi);
278            log.debug("Remove contributions from WorkManagerImpl");
279            // Removes the WorkManagerImpl so it does not create any worker pool
280            workQueueConfigField.set(wmi, new WorkQueueRegistry());
281            // TODO: should we remove workQueuingConfig registry as well ?
282        } catch (IllegalAccessException e) {
283            throw new RuntimeException(e);
284        }
285        wqr.getQueueIds().forEach(id -> workQueueConfig.addContribution(wqr.get(id)));
286        streamIds.addAll(workQueueConfig.getQueueIds());
287        workQueueConfig.getQueueIds().forEach(id -> log.info("Registering : " + id));
288    }
289
290    protected void initTopology() {
291        // create a single topology with one root per work pool
292        Topology.Builder builder = Topology.builder();
293        workQueueConfig.getQueueIds().stream().filter(item -> workQueueConfig.get(item).isProcessingEnabled()).forEach(
294                item -> builder.addComputation(() -> new WorkComputation(item),
295                        Collections.singletonList("i1:" + item)));
296        topology = builder.build();
297        settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY), getCodec());
298        workQueueConfig.getQueueIds()
299                       .forEach(item -> settings.setConcurrency(item, workQueueConfig.get(item).getMaxThreads()));
300        workQueueConfig.getQueueIds().forEach(
301                item -> settings.setPartitions(item, getPartitions(workQueueConfig.get(item).getMaxThreads())));
302    }
303
304    protected int getPartitions(int maxThreads) {
305        if (maxThreads == 1) {
306            // when the pool size is one the we don't want any concurrency
307            return 1;
308        }
309        return getOverProvisioningFactor() * maxThreads;
310    }
311
312    public class WorkScheduling implements Synchronization {
313        public final Work work;
314
315        public final Scheduling scheduling;
316
317        public WorkScheduling(Work work, Scheduling scheduling) {
318            this.work = work;
319            this.scheduling = scheduling;
320        }
321
322        @Override
323        public void beforeCompletion() {
324        }
325
326        @Override
327        public void afterCompletion(int status) {
328            if (status == Status.STATUS_COMMITTED) {
329                StreamWorkManager.this.schedule(work, scheduling, false);
330            } else {
331                if (status != Status.STATUS_ROLLEDBACK) {
332                    throw new IllegalArgumentException("Unsupported transaction status " + status);
333                }
334            }
335
336        }
337    }
338
339    @Override
340    void activateQueue(WorkQueueDescriptor config) {
341        // queue processing is activated only from component listener afterStart
342        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
343            throw new IllegalArgumentException("cannot activate all queues");
344        }
345        log.info("Activated queue " + config.id + " " + config.toEffectiveString());
346        if (config.isProcessingEnabled()) {
347            activateQueueMetrics(config.id);
348        }
349    }
350
351    @Override
352    void deactivateQueue(WorkQueueDescriptor config) {
353        // queue processing is deactivated only on shutdown
354        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
355            throw new IllegalArgumentException("cannot deactivate all queues");
356        }
357        if (config.isProcessingEnabled()) {
358            deactivateQueueMetrics(config.id);
359        }
360        log.info("Deactivated work queue not supported: " + config.id);
361    }
362
363    @Override
364    protected void activateQueueMetrics(String queueId) {
365        NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId);
366        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).scheduled, "scheduled");
367        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).running, "running");
368        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).completed, "completed");
369        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).canceled, "canceled");
370        registry.registerAll(queueMetrics);
371    }
372
373    @Override
374    protected void deactivateQueueMetrics(String queueId) {
375        String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId);
376        registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName));
377    }
378
379    @Override
380    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) {
381        log.warn("Shutdown a queue is not supported with computation implementation");
382        return false;
383    }
384
385    @Override
386    public boolean shutdown(long timeout, TimeUnit timeUnit) {
387        log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms");
388        shutdownInProgress = true;
389        try {
390            long shutdownDelay = Long.parseLong(Framework.getService(ConfigurationService.class)
391                                                         .getProperty(SHUTDOWN_DELAY_MS_KEY, "0"));
392            boolean ret = streamProcessor.stop(Duration.ofMillis(Math.max(timeUnit.toMillis(timeout), shutdownDelay)));
393            if (!ret) {
394                log.error("Not able to stop worker pool within the timeout.");
395            }
396            return ret;
397        } finally {
398            shutdownInProgress = false;
399        }
400    }
401
402    @Override
403    public int getQueueSize(String queueId, Work.State state) {
404        switch (state) {
405        case SCHEDULED:
406            return getMetrics(queueId).getScheduled().intValue();
407        case RUNNING:
408            return getMetrics(queueId).getRunning().intValue();
409        default:
410            return 0;
411        }
412    }
413
414    protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) {
415        // JMX threads have distinct class loader that need to be changed to get metrics
416        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
417        try {
418            Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader());
419            return getMetrics(queueId);
420        } finally {
421            Thread.currentThread().setContextClassLoader(classLoader);
422        }
423    }
424
425    @Override
426    public WorkQueueMetrics getMetrics(String queueId) {
427        LogLag lag = logManager.getLag(queueId, queueId);
428        long running = 0;
429        if (lag.lag() > 0) {
430            // we don't have the exact running metric
431            // give an approximation that can be higher that actual one because of the over provisioning
432            running = min(lag.lag(), settings.getPartitions(queueId));
433        }
434        return new WorkQueueMetrics(queueId, lag.lag(), running, lag.lower(), 0);
435    }
436
437    @Override
438    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
439        if (queueId != null) {
440            return awaitCompletionOnQueue(queueId, duration, unit);
441        }
442        for (String item : workQueueConfig.getQueueIds()) {
443            if (!awaitCompletionOnQueue(item, duration, unit)) {
444                return false;
445            }
446        }
447        return true;
448    }
449
450    protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException {
451        if (!isStarted()) {
452            return true;
453        }
454        log.debug("awaitCompletion " + queueId + " starting");
455        // wait for the lag to be null
456        long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow
457        long deadline = System.currentTimeMillis() + durationMs;
458        while (System.currentTimeMillis() < deadline) {
459            Thread.sleep(100);
460            int lag = getMetrics(queueId).getScheduled().intValue();
461            if (lag == 0) {
462                if (log.isDebugEnabled()) {
463                    log.debug("awaitCompletion for " + queueId + " completed " + getMetrics(queueId));
464                }
465                return true;
466            }
467            if (!log.isDebugEnabled()) {
468                log.debug("awaitCompletion for " + queueId + " not completed " + getMetrics(queueId));
469            }
470        }
471        log.warn(String.format("%s timeout after: %.2fs, %s", queueId, durationMs / 1000.0, getMetrics(queueId)));
472        return false;
473    }
474
475    /**
476     * @deprecated since 10.2 because unused
477     */
478    @Deprecated
479    public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit)
480            throws InterruptedException {
481        if (!isStarted()) {
482            return true;
483        }
484        // wait that the low watermark get stable
485        long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow
486        long deadline = System.currentTimeMillis() + durationMs;
487        long lowWatermark = getLowWaterMark(queueId);
488        while (System.currentTimeMillis() < deadline) {
489            Thread.sleep(100);
490            long wm = getLowWaterMark(queueId);
491            if (wm == lowWatermark) {
492                log.debug("awaitCompletion for " + (queueId == null ? "all" : queueId) + " completed " + wm);
493                return true;
494            }
495            if (log.isDebugEnabled()) {
496                log.debug("awaitCompletion low wm  for " + (queueId == null ? "all" : queueId) + ":" + wm + " diff: "
497                        + (wm - lowWatermark));
498            }
499            lowWatermark = wm;
500        }
501        log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0));
502        return false;
503    }
504
505    protected long getLowWaterMark(String queueId) {
506        if (queueId != null) {
507            return streamProcessor.getLowWatermark(queueId);
508        }
509        return streamProcessor.getLowWatermark();
510    }
511
512    @Override
513    public Work.State getWorkState(String workId) {
514        if (!storeState) {
515            return null;
516        }
517        return WorkStateHelper.getState(workId);
518    }
519
520    @Override
521    public Work find(String s, Work.State state) {
522        // always not found
523        return null;
524    }
525
526    @Override
527    public List<Work> listWork(String s, Work.State state) {
528        return Collections.emptyList();
529    }
530
531    @Override
532    public List<String> listWorkIds(String s, Work.State state) {
533        return Collections.emptyList();
534    }
535
536    @Override
537    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
538        TransactionManager transactionManager;
539        try {
540            transactionManager = TransactionHelper.lookupTransactionManager();
541        } catch (NamingException e) {
542            transactionManager = null;
543        }
544        if (transactionManager == null) {
545            log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId());
546            return false;
547        }
548        try {
549            Transaction transaction = transactionManager.getTransaction();
550            if (transaction == null) {
551                if (log.isDebugEnabled()) {
552                    log.debug("Not scheduled work after commit because of missing transaction: " + work.getId());
553                }
554                return false;
555            }
556            int status = transaction.getStatus();
557            if (status == Status.STATUS_ACTIVE) {
558                if (log.isDebugEnabled()) {
559                    log.debug("Scheduled after commit: " + work.getId());
560                }
561                transaction.registerSynchronization(new StreamWorkManager.WorkScheduling(work, scheduling));
562                return true;
563            } else if (status == Status.STATUS_COMMITTED) {
564                // called in afterCompletion, we can schedule immediately
565                if (log.isDebugEnabled()) {
566                    log.debug("Scheduled immediately: " + work.getId());
567                }
568                return false;
569            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
570                if (log.isDebugEnabled()) {
571                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId());
572                }
573                return true;
574            } else {
575                if (log.isDebugEnabled()) {
576                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
577                            + work.getId());
578                }
579                return false;
580            }
581        } catch (SystemException | RollbackException e) {
582            log.error("Cannot schedule after commit", e);
583            return false;
584        }
585    }
586
587}