001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.work;
020
021import static java.lang.Math.min;
022import static org.nuxeo.ecm.core.work.api.WorkManager.Scheduling.CANCEL_SCHEDULED;
023
024import java.lang.reflect.Field;
025import java.time.Duration;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.TimeUnit;
031
032import javax.naming.NamingException;
033import javax.transaction.RollbackException;
034import javax.transaction.Status;
035import javax.transaction.Synchronization;
036import javax.transaction.SystemException;
037import javax.transaction.Transaction;
038import javax.transaction.TransactionManager;
039
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.event.EventServiceComponent;
044import org.nuxeo.ecm.core.work.api.Work;
045import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
046import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
047import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
048import org.nuxeo.lib.stream.computation.Record;
049import org.nuxeo.lib.stream.computation.Settings;
050import org.nuxeo.lib.stream.computation.StreamProcessor;
051import org.nuxeo.lib.stream.computation.Topology;
052import org.nuxeo.lib.stream.computation.Watermark;
053import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
054import org.nuxeo.lib.stream.log.LogAppender;
055import org.nuxeo.lib.stream.log.LogLag;
056import org.nuxeo.lib.stream.log.LogManager;
057import org.nuxeo.runtime.api.Framework;
058import org.nuxeo.runtime.metrics.NuxeoMetricSet;
059import org.nuxeo.runtime.model.ComponentContext;
060import org.nuxeo.runtime.model.ComponentManager;
061import org.nuxeo.runtime.stream.StreamService;
062import org.nuxeo.runtime.transaction.TransactionHelper;
063
064import com.codahale.metrics.MetricRegistry;
065
066/**
067 * WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed
068 * for performance reason.
069 *
070 * @since 9.3
071 */
072public class StreamWorkManager extends WorkManagerImpl {
073    protected static final Log log = LogFactory.getLog(StreamWorkManager.class);
074
075    public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config";
076
077    public static final String DEFAULT_WORK_LOG_CONFIG = "work";
078
079    public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor";
080
081    public static final String DEFAULT_WORK_OVER_PROVISIONING = "3";
082
083    public static final int DEFAULT_CONCURRENCY = 4;
084
085    protected Topology topology;
086
087    protected Settings settings;
088
089    protected StreamProcessor streamProcessor;
090
091    protected LogManager logManager;
092
093    protected final Set<String> streamIds = new HashSet<>();
094
095    protected int getOverProvisioningFactor() {
096        // Enable over provisioning only if the log can be distributed
097        if (getLogManager().supportSubscribe()) {
098            return Integer.parseInt(Framework.getProperty(WORK_OVER_PROVISIONING_PROP, DEFAULT_WORK_OVER_PROVISIONING));
099        }
100        return 1;
101    }
102
103    @Override
104    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
105        String queueId = getStreamForCategory(work.getCategory());
106        if (log.isDebugEnabled()) {
107            log.debug(String.format(
108                    "Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s",
109                    work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work));
110        }
111        if (!isQueuingEnabled(queueId)) {
112            log.info("Queue disabled, scheduling canceled: " + queueId);
113            return;
114        }
115        if (CANCEL_SCHEDULED.equals(scheduling)) {
116            log.warn("Canceling a work is not supported by this impl, skipping work: " + work);
117            return;
118        }
119        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
120            return;
121        }
122        WorkSchedulePath.newInstance(work);
123        LogAppender<Record> appender = logManager.getAppender(getStreamForCategory(work.getCategory()));
124        if (appender == null) {
125            log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(),
126                    getStreamForCategory(work.getCategory())));
127            return;
128        }
129        String key = work.getPartitionKey();
130        appender.append(key, new Record(key, WorkComputation.serialize(work),
131                Watermark.ofTimestamp(System.currentTimeMillis()).getValue(), null));
132    }
133
134    protected String getStreamForCategory(String category) {
135        if (category != null && streamIds.contains(category)) {
136            return category;
137        }
138        return DEFAULT_CATEGORY;
139    }
140
141    @Override
142    public int getApplicationStartedOrder() {
143        // start before the WorkManagerImpl
144        return EventServiceComponent.APPLICATION_STARTED_ORDER - 2;
145    }
146
147    @Override
148    public void start(ComponentContext context) {
149        init();
150    }
151
152    @Override
153    public void init() {
154        if (started) {
155            return;
156        }
157        log.debug("Initializing");
158        synchronized (this) {
159            if (started) {
160                return;
161            }
162            supplantWorkManagerImpl();
163            workQueueConfig.index();
164            initTopology();
165            this.logManager = getLogManager();
166            this.streamProcessor = new LogStreamProcessor(logManager);
167            streamProcessor.init(topology, settings);
168            started = true;
169            new ComponentListener().install();
170            log.info("Initialized");
171        }
172    }
173
174    class ComponentListener implements ComponentManager.Listener {
175        @Override
176        public void beforeStop(ComponentManager mgr, boolean isStandby) {
177            try {
178                if (!shutdown(10, TimeUnit.SECONDS)) {
179                    log.error("Some processors are still active");
180                }
181            } catch (InterruptedException e) {
182                Thread.currentThread().interrupt();
183                throw new NuxeoException("Interrupted while stopping work manager thread pools", e);
184            }
185        }
186
187        @Override
188        public void afterStart(ComponentManager mgr, boolean isResume) {
189            streamProcessor.start();
190            for (String id : workQueueConfig.getQueueIds()) {
191                activateQueueMetrics(id);
192            }
193        }
194
195        @Override
196        public void afterStop(ComponentManager mgr, boolean isStandby) {
197            Framework.getRuntime().getComponentManager().removeListener(this);
198            for (String id : workQueueConfig.getQueueIds()) {
199                deactivateQueueMetrics(id);
200            }
201        }
202    }
203
204    protected LogManager getLogManager() {
205        String config = getLogConfig();
206        log.info("Init StreamWorkManager with Log configuration: " + config);
207        StreamService service = Framework.getService(StreamService.class);
208        return service.getLogManager(getLogConfig());
209    }
210
211    protected String getLogConfig() {
212        return Framework.getProperty(WORK_LOG_CONFIG_PROP, DEFAULT_WORK_LOG_CONFIG);
213    }
214
215    @Override
216    public boolean isProcessingEnabled(String queueId) {
217        WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId);
218        return wqd != null && wqd.isProcessingEnabled();
219    }
220
221    /**
222     * Hack to steal the WorkManagerImpl queue contributions.
223     */
224    protected void supplantWorkManagerImpl() {
225        WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service");
226        Class clazz = WorkManagerImpl.class;
227        Field workQueueConfigField;
228        try {
229            workQueueConfigField = clazz.getDeclaredField("workQueueConfig");
230        } catch (NoSuchFieldException e) {
231            throw new RuntimeException(e);
232        }
233        workQueueConfigField.setAccessible(true);
234        final WorkQueueRegistry wqr;
235        try {
236            wqr = (WorkQueueRegistry) workQueueConfigField.get(wmi);
237            log.debug("Remove contributions from WorkManagerImpl");
238            // Removes the WorkManagerImpl so it does not create any worker pool
239            workQueueConfigField.set(wmi, new WorkQueueRegistry());
240            // TODO: should we remove workQueuingConfig registry as well ?
241        } catch (IllegalAccessException e) {
242            throw new RuntimeException(e);
243        }
244        wqr.getQueueIds().forEach(id -> workQueueConfig.addContribution(wqr.get(id)));
245        streamIds.addAll(workQueueConfig.getQueueIds());
246        workQueueConfig.getQueueIds().forEach(id -> log.info("Registering : " + id));
247    }
248
249    protected void initTopology() {
250        // create a single topology with one root per work pool
251        Topology.Builder builder = Topology.builder();
252        workQueueConfig.getQueueIds().stream().filter(item -> workQueueConfig.get(item).isProcessingEnabled()).forEach(
253                item -> builder.addComputation(() -> new WorkComputation(item),
254                        Collections.singletonList("i1:" + item)));
255        this.topology = builder.build();
256        this.settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY));
257        workQueueConfig.getQueueIds()
258                       .forEach(item -> settings.setConcurrency(item, workQueueConfig.get(item).getMaxThreads()));
259        workQueueConfig.getQueueIds().forEach(
260                item -> settings.setPartitions(item, getPartitions(workQueueConfig.get(item).getMaxThreads())));
261    }
262
263    protected int getPartitions(int maxThreads) {
264        if (maxThreads == 1) {
265            // when the pool size is one the we don't want any concurrency
266            return 1;
267        }
268        return getOverProvisioningFactor() * maxThreads;
269    }
270
271    public class WorkScheduling implements Synchronization {
272        public final Work work;
273
274        public final Scheduling scheduling;
275
276        public WorkScheduling(Work work, Scheduling scheduling) {
277            this.work = work;
278            this.scheduling = scheduling;
279        }
280
281        @Override
282        public void beforeCompletion() {
283        }
284
285        @Override
286        public void afterCompletion(int status) {
287            if (status == Status.STATUS_COMMITTED) {
288                StreamWorkManager.this.schedule(this.work, this.scheduling, false);
289            } else {
290                if (status != Status.STATUS_ROLLEDBACK) {
291                    throw new IllegalArgumentException("Unsupported transaction status " + status);
292                }
293            }
294
295        }
296    }
297
298    @Override
299    void activateQueue(WorkQueueDescriptor config) {
300        // queue processing is activated only from component listener afterStart
301        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
302            throw new IllegalArgumentException("cannot activate all queues");
303        }
304        log.info("Activated queue " + config.id + " " + config.toEffectiveString());
305        if (config.isProcessingEnabled()) {
306            activateQueueMetrics(config.id);
307        }
308    }
309
310    @Override
311    void deactivateQueue(WorkQueueDescriptor config) {
312        // queue processing is deactivated only on shutdown
313        if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) {
314            throw new IllegalArgumentException("cannot deactivate all queues");
315        }
316        if (config.isProcessingEnabled()) {
317            deactivateQueueMetrics(config.id);
318        }
319        log.info("Deactivated work queue not supported: " + config.id);
320    }
321
322    @Override
323    protected void activateQueueMetrics(String queueId) {
324        NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId);
325        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).scheduled, "scheduled");
326        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).running, "running");
327        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).completed, "completed");
328        queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).canceled, "canceled");
329        registry.registerAll(queueMetrics);
330    }
331
332    @Override
333    protected void deactivateQueueMetrics(String queueId) {
334        String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId);
335        registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName));
336    }
337
338    @Override
339    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
340        log.warn("Shutdown a queue is not supported with computation implementation");
341        return false;
342    }
343
344    @Override
345    public boolean shutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
346        log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms");
347        shutdownInProgress = true;
348        try {
349            boolean ret = streamProcessor.stop(Duration.ofMillis(timeUnit.toMillis(timeout)));
350            if (!ret) {
351                log.error("Not able to stop worker pool within the timeout.");
352            }
353            return ret;
354        } finally {
355            shutdownInProgress = false;
356        }
357    }
358
359    @Override
360    public int getQueueSize(String queueId, Work.State state) {
361        switch (state) {
362        case SCHEDULED:
363            return getMetrics(queueId).getScheduled().intValue();
364        case RUNNING:
365            return getMetrics(queueId).getRunning().intValue();
366        default:
367            return 0;
368        }
369    }
370
371    protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) {
372        // JMX threads have distinct class loader that need to be changed to get metrics
373        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
374        try {
375            Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader());
376            return getMetrics(queueId);
377        } finally {
378            Thread.currentThread().setContextClassLoader(classLoader);
379        }
380    }
381
382    @Override
383    public WorkQueueMetrics getMetrics(String queueId) {
384        LogLag lag = logManager.getLag(queueId, queueId);
385        long running = 0;
386        if (lag.lag() > 0) {
387            // we don't have the exact running metric
388            // give an approximation that can be higher that actual one because of the over provisioning
389            running = min(lag.lag(), settings.getPartitions(queueId));
390        }
391        return new WorkQueueMetrics(queueId, lag.lag(), running, lag.lower(), 0);
392    }
393
394    @Override
395    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
396        if (queueId != null) {
397            return awaitCompletionOnQueue(queueId, duration, unit);
398        }
399        for (String item : workQueueConfig.getQueueIds()) {
400            if (!awaitCompletionOnQueue(item, duration, unit)) {
401                return false;
402            }
403        }
404        return true;
405    }
406
407    protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException {
408        if (!isStarted()) {
409            return true;
410        }
411        log.debug("awaitCompletion " + queueId + " starting");
412        // wait for the lag to be null
413        long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow
414        long deadline = System.currentTimeMillis() + durationMs;
415        while (System.currentTimeMillis() < deadline) {
416            Thread.sleep(100);
417            int lag = getMetrics(queueId).getScheduled().intValue();
418            if (lag == 0) {
419                if (log.isDebugEnabled()) {
420                    log.debug("awaitCompletion for " + queueId + " completed " + getMetrics(queueId));
421                }
422                return true;
423            }
424            if (!log.isDebugEnabled()) {
425                log.debug("awaitCompletion for " + queueId + " not completed " + getMetrics(queueId));
426            }
427        }
428        log.warn(String.format("%s timeout after: %.2fs, %s", queueId, durationMs / 1000.0, getMetrics(queueId)));
429        return false;
430    }
431
432    public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit)
433            throws InterruptedException {
434        if (!isStarted()) {
435            return true;
436        }
437        // wait that the low watermark get stable
438        long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow
439        long deadline = System.currentTimeMillis() + durationMs;
440        long lowWatermark = getLowWaterMark(queueId);
441        while (System.currentTimeMillis() < deadline) {
442            Thread.sleep(100);
443            long wm = getLowWaterMark(queueId);
444            if (wm == lowWatermark) {
445                log.debug("awaitCompletion for " + ((queueId == null) ? "all" : queueId) + " completed " + wm);
446                return true;
447            }
448            if (log.isDebugEnabled()) {
449                log.debug("awaitCompletion low wm  for " + ((queueId == null) ? "all" : queueId) + ":" + wm + " diff: "
450                        + (wm - lowWatermark));
451            }
452            lowWatermark = wm;
453        }
454        log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0));
455        return false;
456    }
457
458    protected long getLowWaterMark(String queueId) {
459        if (queueId != null) {
460            return streamProcessor.getLowWatermark(queueId);
461        }
462        return streamProcessor.getLowWatermark();
463    }
464
465    @Override
466    public Work.State getWorkState(String s) {
467        // always not found
468        return null;
469    }
470
471    @Override
472    public Work find(String s, Work.State state) {
473        // always not found
474        return null;
475    }
476
477    @Override
478    public List<Work> listWork(String s, Work.State state) {
479        return Collections.emptyList();
480    }
481
482    @Override
483    public List<String> listWorkIds(String s, Work.State state) {
484        return Collections.emptyList();
485    }
486
487    @Override
488    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
489        TransactionManager transactionManager;
490        try {
491            transactionManager = TransactionHelper.lookupTransactionManager();
492        } catch (NamingException e) {
493            transactionManager = null;
494        }
495        if (transactionManager == null) {
496            log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId());
497            return false;
498        }
499        try {
500            Transaction transaction = transactionManager.getTransaction();
501            if (transaction == null) {
502                if (log.isDebugEnabled()) {
503                    log.debug("Not scheduled work after commit because of missing transaction: " + work.getId());
504                }
505                return false;
506            }
507            int status = transaction.getStatus();
508            if (status == Status.STATUS_ACTIVE) {
509                if (log.isDebugEnabled()) {
510                    log.debug("Scheduled after commit: " + work.getId());
511                }
512                transaction.registerSynchronization(new StreamWorkManager.WorkScheduling(work, scheduling));
513                return true;
514            } else if (status == Status.STATUS_COMMITTED) {
515                // called in afterCompletion, we can schedule immediately
516                if (log.isDebugEnabled()) {
517                    log.debug("Scheduled immediately: " + work.getId());
518                }
519                return false;
520            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
521                if (log.isDebugEnabled()) {
522                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId());
523                }
524                return true;
525            } else {
526                if (log.isDebugEnabled()) {
527                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
528                            + work.getId());
529                }
530                return false;
531            }
532        } catch (SystemException | RollbackException e) {
533            log.error("Cannot schedule after commit", e);
534            return false;
535        }
536    }
537
538}