001/*
002 * (C) Copyright 2017-2019 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import java.time.Duration;
022import java.time.Instant;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.LinkedHashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.function.Supplier;
032import java.util.stream.Collectors;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.lib.stream.computation.Computation;
037import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
038import org.nuxeo.lib.stream.computation.ComputationPolicy;
039import org.nuxeo.lib.stream.computation.Record;
040import org.nuxeo.lib.stream.computation.Watermark;
041import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
042import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval;
043import org.nuxeo.lib.stream.log.LogPartition;
044import org.nuxeo.lib.stream.log.LogRecord;
045import org.nuxeo.lib.stream.log.LogTailer;
046import org.nuxeo.lib.stream.log.Name;
047import org.nuxeo.lib.stream.log.RebalanceException;
048import org.nuxeo.lib.stream.log.RebalanceListener;
049
050import io.dropwizard.metrics5.Counter;
051import io.dropwizard.metrics5.MetricName;
052import io.dropwizard.metrics5.MetricRegistry;
053import io.dropwizard.metrics5.SharedMetricRegistries;
054import io.dropwizard.metrics5.Timer;
055import io.opencensus.common.Scope;
056import io.opencensus.trace.AttributeValue;
057import io.opencensus.trace.BlankSpan;
058import io.opencensus.trace.Link;
059import io.opencensus.trace.Span;
060import io.opencensus.trace.SpanContext;
061import io.opencensus.trace.Tracer;
062import io.opencensus.trace.Tracing;
063import io.opencensus.trace.propagation.BinaryFormat;
064import io.opencensus.trace.propagation.SpanContextParseException;
065
066import net.jodah.failsafe.Failsafe;
067
068/**
069 * Thread driving a Computation
070 *
071 * @since 9.3
072 */
073@SuppressWarnings("EmptyMethod")
074public class ComputationRunner implements Runnable, RebalanceListener {
075    public static final Duration READ_TIMEOUT = Duration.ofMillis(25);
076
077    protected static final long STARVING_TIMEOUT_MS = 1000;
078
079    protected static final long INACTIVITY_BREAK_MS = 100;
080
081    private static final Log log = LogFactory.getLog(ComputationRunner.class);
082
083    protected final LogStreamManager streamManager;
084
085    protected final ComputationMetadataMapping metadata;
086
087    protected final LogTailer<Record> tailer;
088
089    protected final Supplier<Computation> supplier;
090
091    protected final CountDownLatch assignmentLatch = new CountDownLatch(1);
092
093    protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
094
095    protected final ComputationPolicy policy;
096
097    protected ComputationContextImpl context;
098
099    protected volatile boolean stop;
100
101    protected volatile boolean drain;
102
103    protected Computation computation;
104
105    protected long counter;
106
107    protected long inRecords;
108
109    protected long inCheckpointRecords;
110
111    protected long outRecords;
112
113    protected long lastReadTime = System.currentTimeMillis();
114
115    protected long lastTimerExecution;
116
117    protected String threadName;
118
119    protected List<LogPartition> defaultAssignment;
120
121    // @since 11.1
122    // Use the Nuxeo registry name without adding dependency on nuxeo-runtime
123    public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService";
124
125    public static final String GLOBAL_FAILURE_COUNT_REGISTRY_NAME = MetricRegistry.name("nuxeo", "streams", "failure")
126                                                                                  .getKey();
127
128    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
129
130    protected Counter globalFailureCount;
131
132    protected Counter failureCount;
133
134    protected Counter recordSkippedCount;
135
136    protected Counter runningCount;
137
138    protected Timer processRecordTimer;
139
140    protected Timer processTimerTimer;
141
142    // @since 11.1
143    protected static AtomicInteger skipFailures = new AtomicInteger(0);
144
145    // @since 11.1
146    protected boolean recordActivity;
147
148    protected SpanContext lastSpanContext;
149
150    @SuppressWarnings("unchecked")
151    public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
152            List<LogPartition> defaultAssignment, LogStreamManager streamManager, ComputationPolicy policy) {
153        this.supplier = supplier;
154        this.metadata = metadata;
155        this.streamManager = streamManager;
156        this.policy = policy;
157        if (metadata.inputStreams().isEmpty()) {
158            this.tailer = null;
159            this.context = new ComputationContextImpl(streamManager, metadata, policy, false);
160            assignmentLatch.countDown();
161        } else if (streamManager.supportSubscribe()) {
162            this.tailer = streamManager.subscribe(Name.ofUrn(metadata.name()),
163                    metadata.inputStreams().stream().map(Name::ofUrn).collect(Collectors.toList()), this);
164            // create a spare context until the assignment is done
165            this.context = new ComputationContextImpl(streamManager, metadata, policy, true);
166        } else {
167            this.context = new ComputationContextImpl(streamManager, metadata, policy,  defaultAssignment.isEmpty());
168            this.tailer = streamManager.createTailer(Name.ofUrn(metadata.name()), defaultAssignment);
169            assignmentLatch.countDown();
170        }
171        this.defaultAssignment = defaultAssignment;
172    }
173
174    public void stop() {
175        log.debug(metadata.name() + ": Receives Stop signal");
176        stop = true;
177        if (computation != null) {
178            computation.signalStop();
179        }
180    }
181
182    public void drain() {
183        log.debug(metadata.name() + ": Receives Drain signal");
184        drain = true;
185    }
186
187    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
188        if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
189            log.warn(metadata.name() + ": Timeout waiting for assignment");
190            return false;
191        }
192        return true;
193    }
194
195    @Override
196    public void run() {
197        threadName = Thread.currentThread().getName();
198        boolean interrupted = false;
199        boolean normalTermination = false;
200        computation = supplier.get();
201        log.debug(metadata.name() + ": Init");
202        registerMetrics();
203        try {
204            computation.init(context);
205            log.debug(metadata.name() + ": Start");
206            processLoop();
207            normalTermination = true;
208        } catch (InterruptedException e) {
209            interrupted = true; // Thread.currentThread().interrupt() in finally
210            // this is expected when the pool is shutdownNow
211            String msg = metadata.name() + ": Interrupted";
212            if (log.isTraceEnabled()) {
213                log.debug(msg, e);
214            } else {
215                log.debug(msg);
216            }
217        } catch (Exception e) {
218            if (Thread.interrupted()) {
219                // clearing the interrupt flag is wanted as closeTailer method needs an non-interrupted thread
220                interrupted = true; // Thread.currentThread().interrupt() in finally
221                // this can happen when pool is shutdownNow throwing ClosedByInterruptException
222                log.info(metadata.name() + ": Interrupted", e);
223            } else {
224                log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e);
225                throw e;
226            }
227        } finally {
228            try {
229                computation.destroy();
230                closeTailer();
231            } finally {
232                if (interrupted) {
233                    Thread.currentThread().interrupt();
234                }
235            }
236            if (normalTermination || interrupted) {
237                log.debug(metadata.name() + ": Terminated " + (interrupted ? "interrupted" : "normal"));
238            } else {
239                // Terminating because of unexpected error in the ComputationRunner code
240                log.error(String.format("Terminate computation: %s due to previous failure", metadata.name()));
241                globalFailureCount.inc();
242                failureCount.inc();
243            }
244        }
245    }
246
247    protected void registerMetrics() {
248        globalFailureCount = registry.counter(GLOBAL_FAILURE_COUNT_REGISTRY_NAME);
249        String name = Name.ofUrn(metadata.name()).getId();
250        runningCount = registry.counter(
251                MetricName.build("nuxeo.streams.computation.running").tagged("computation", name));
252        failureCount = registry.counter(
253                MetricName.build("nuxeo.streams.computation.failure").tagged("computation", name));
254        recordSkippedCount = registry.counter(
255                MetricName.build("nuxeo.streams.computation.skippedRecord").tagged("computation", name));
256        processRecordTimer = registry.timer(
257                MetricName.build("nuxeo.streams.computation.processRecord").tagged("computation", name));
258        processTimerTimer = registry.timer(
259                MetricName.build("nuxeo.streams.computation.processTimer").tagged("computation", name));
260    }
261
262    protected void closeTailer() {
263        if (tailer != null && !tailer.closed()) {
264            tailer.close();
265        }
266    }
267
268    protected void processLoop() throws InterruptedException {
269        boolean timerActivity;
270        while (continueLoop()) {
271            timerActivity = processTimer();
272            recordActivity = processRecord();
273            counter++;
274            if (!timerActivity && !recordActivity) {
275                // no activity take a break
276                Thread.sleep(INACTIVITY_BREAK_MS);
277            }
278        }
279    }
280
281    protected boolean continueLoop() {
282        if (stop || Thread.currentThread().isInterrupted()) {
283            log.debug(metadata.name() + ": Stop processing " + (stop ? "stop required" : " interrupted"));
284            return false;
285        } else if (drain) {
286            long now = System.currentTimeMillis();
287            if (metadata.inputStreams().isEmpty()) {
288                // for a source we take lastTimerExecution starvation
289                if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) {
290                    log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago");
291                    return false;
292                }
293            } else if (!recordActivity && (now - lastReadTime) > STARVING_TIMEOUT_MS) {
294                log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, "
295                            + inRecords + " records read, " + counter + " reads attempt");
296                return false;
297            }
298        }
299        return true;
300    }
301
302    protected boolean processTimer() {
303        Map<String, Long> timers = context.getTimers();
304        if (timers.isEmpty()) {
305            return false;
306        }
307        if (tailer != null && tailer.assignments().isEmpty()) {
308            // needed to ensure single source across multiple nodes
309            return false;
310        }
311        long now = System.currentTimeMillis();
312        // filter and order timers
313        LinkedHashMap<String, Long> sortedTimer = timers.entrySet()
314                                                        .stream()
315                                                        .filter(entry -> entry.getValue() <= now)
316                                                        .sorted(Map.Entry.comparingByValue())
317                                                        .collect(
318                                                                Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
319                                                                        (e1, e2) -> e1, LinkedHashMap::new));
320        if (sortedTimer.isEmpty()) {
321            return false;
322        }
323        return processTimerWithTracing(now, sortedTimer);
324    }
325
326    protected boolean processTimerWithTracing(long now, LinkedHashMap<String, Long> sortedTimer) {
327        Tracer tracer = Tracing.getTracer();
328        Span span;
329        if (lastSpanContext != null) {
330            span = tracer.spanBuilderWithRemoteParent("comp/" + computation.metadata().name() + "/timer",
331                    lastSpanContext).startSpan();
332            span.addLink(Link.fromSpanContext(lastSpanContext, Link.Type.PARENT_LINKED_SPAN));
333            HashMap<String, AttributeValue> map = new HashMap<>();
334            map.put("comp.name", AttributeValue.stringAttributeValue(computation.metadata().name()));
335            map.put("comp.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName()));
336            map.put("record.last_offset", AttributeValue.stringAttributeValue(context.getLastOffset().toString()));
337            span.putAttributes(map);
338            lastSpanContext = null;
339        } else {
340            span = BlankSpan.INSTANCE;
341        }
342        try (Scope scope = Tracing.getTracer().withSpan(span)) {
343            final boolean[] timerUpdate = { false };
344            sortedTimer.forEach((key, value) -> {
345                context.removeTimer(key);
346                processTimerWithRetry(key, value);
347                timerUpdate[0] = true;
348            });
349            if (timerUpdate[0]) {
350                checkSourceLowWatermark();
351                lastTimerExecution = now;
352                setThreadName("timer");
353                checkpointIfNecessary();
354                if (context.requireTerminate()) {
355                    stop = true;
356                }
357                return true;
358            }
359            return false;
360        } finally {
361            span.end();
362        }
363    }
364
365    protected void processTimerWithRetry(String key, Long value) {
366        try (Timer.Context ignored = processTimerTimer.time()) {
367            Failsafe.with(policy.getRetryPolicy())
368                    .onRetry(failure -> computation.processRetry(context, failure))
369                    .onFailure(failure -> computation.processFailure(context, failure))
370                    .withFallback(() -> processFallback(context))
371                    .run(() -> computation.processTimer(context, key, value));
372        }
373    }
374
375    protected boolean processRecord() throws InterruptedException {
376        if (context.requireTerminate()) {
377            stop = true;
378            return true;
379        }
380        if (tailer == null) {
381            return false;
382        }
383        Duration timeoutRead = getTimeoutDuration();
384        LogRecord<Record> logRecord = null;
385        try {
386            logRecord = tailer.read(timeoutRead);
387        } catch (RebalanceException e) {
388            // the revoke has done a checkpoint we can continue
389        }
390        Record record;
391        if (logRecord != null) {
392            record = logRecord.message();
393            Name stream = logRecord.offset().partition().name();
394            Record filteredRecord = streamManager.getFilter(stream).afterRead(record, logRecord.offset());
395            if (filteredRecord == null) {
396                if (log.isDebugEnabled()) {
397                    log.debug("Filtering skip record: " + record);
398                }
399                return false;
400            } else if (filteredRecord != record) {
401                logRecord = new LogRecord<>(filteredRecord, logRecord.offset());
402                record = filteredRecord;
403            }
404            lastReadTime = System.currentTimeMillis();
405            inRecords++;
406            lowWatermark.mark(record.getWatermark());
407            context.setLastOffset(logRecord.offset());
408            String from = metadata.reverseMap(stream.getUrn());
409            processRecordWithTracing(from, record);
410            return true;
411        }
412        return false;
413    }
414
415    protected void processRecordWithTracing(String from, Record record) {
416        Span span = getSpanFromRecord(record);
417        try (Scope scope = Tracing.getTracer().withSpan(span)) {
418            processRecordWithRetry(from, record);
419            checkRecordFlags(record);
420            checkSourceLowWatermark();
421            setThreadName("record");
422            checkpointIfNecessary();
423        } finally {
424            span.end();
425        }
426    }
427
428    protected Span getSpanFromRecord(Record record) {
429        byte[] traceContext = record.getTraceContext();
430        if (traceContext == null || traceContext.length == 0) {
431            return BlankSpan.INSTANCE;
432        }
433        Tracer tracer = Tracing.getTracer();
434        BinaryFormat binaryFormat = Tracing.getPropagationComponent().getBinaryFormat();
435        try {
436            // Build a span that has a follows from relationship with the parent span to denote an async processing
437            lastSpanContext = binaryFormat.fromByteArray(traceContext);
438            Span span = tracer.spanBuilderWithRemoteParent("comp/" + computation.metadata().name() + "/record", lastSpanContext)
439                              .startSpan();
440            span.addLink(Link.fromSpanContext(lastSpanContext, Link.Type.PARENT_LINKED_SPAN));
441
442            HashMap<String, AttributeValue> map = new HashMap<>();
443            map.put("comp.name", AttributeValue.stringAttributeValue(computation.metadata().name()));
444            map.put("comp.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName()));
445            map.put("record.key", AttributeValue.stringAttributeValue(record.getKey()));
446            map.put("record.offset", AttributeValue.stringAttributeValue(context.getLastOffset().toString()));
447            map.put("record.watermark",
448                    AttributeValue.stringAttributeValue(Watermark.ofValue(record.getWatermark()).toString()));
449            map.put("record.submit_thread", AttributeValue.stringAttributeValue(record.getAppenderThread()));
450            map.put("record.data.length", AttributeValue.longAttributeValue(record.getData().length));
451            span.putAttributes(map);
452            return span;
453        } catch (SpanContextParseException e) {
454            log.warn("Invalid span context in record: " + record.getKey() + " length: " + traceContext.length);
455            return BlankSpan.INSTANCE;
456        }
457    }
458
459    protected void processRecordWithRetry(String from, Record record) {
460        runningCount.inc();
461        try (Timer.Context ignored = processRecordTimer.time()) {
462            Failsafe.with(policy.getRetryPolicy())
463                    .onRetry(failure -> computation.processRetry(context, failure))
464                    .onFailure(failure -> computation.processFailure(context, failure))
465                    .withFallback(() -> processFallback(context))
466                    .run(() -> computation.processRecord(context, from, record));
467        } finally {
468            runningCount.dec();
469        }
470    }
471
472    protected void processFallback(ComputationContextImpl context) {
473        if (policy.continueOnFailure()) {
474            log.error(String.format("Skip record after failure: %s", context.getLastOffset()));
475            context.askForCheckpoint();
476            recordSkippedCount.inc();
477        } else if (skipFailureForRecovery()) {
478            log.error(String.format("Skip record after failure instead of terminating because of recovery mode: %s",
479                    context.getLastOffset()));
480            context.askForCheckpoint();
481            recordSkippedCount.inc();
482        } else {
483            log.error(String.format("Terminate computation: %s due to previous failure", metadata.name()));
484            context.cancelAskForCheckpoint();
485            context.askForTermination();
486            globalFailureCount.inc();
487            failureCount.inc();
488        }
489    }
490
491    protected boolean skipFailureForRecovery() {
492        if (policy.getSkipFirstFailures() > 0) {
493            if (skipFailures.incrementAndGet() <= policy.getSkipFirstFailures()) {
494                return true;
495            }
496        }
497        return false;
498    }
499
500    protected Duration getTimeoutDuration() {
501        // lastReadTime could have been updated by another thread calling onPartitionsAssigned when doing minus
502        // no need to synchronize it, we don't want an accurate value there
503        long adaptedReadTimeout = Math.max(0, System.currentTimeMillis() - lastReadTime);
504        // Adapt the duration so we are not throttling when one of the input stream is empty
505        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), adaptedReadTimeout));
506    }
507
508    protected void checkSourceLowWatermark() {
509        long watermark = context.getSourceLowWatermark();
510        if (watermark > 0) {
511            lowWatermark.mark(Watermark.ofValue(watermark));
512            context.setSourceLowWatermark(0);
513        }
514    }
515
516    protected void checkRecordFlags(Record record) {
517        if (record.getFlags().contains(Record.Flag.POISON_PILL)) {
518            log.info(metadata.name() + ": Receive POISON PILL");
519            context.askForCheckpoint();
520            stop = true;
521        } else if (record.getFlags().contains(Record.Flag.COMMIT)) {
522            context.askForCheckpoint();
523        }
524    }
525
526    protected void checkpointIfNecessary() {
527        if (context.requireCheckpoint()) {
528            boolean completed = false;
529            try {
530                checkpoint();
531                completed = true;
532            } finally {
533                if (!completed) {
534                    log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
535                }
536            }
537        }
538    }
539
540    protected void checkpoint() {
541        sendRecords();
542        saveTimers();
543        saveState();
544        // To Simulate slow checkpoint add a Thread.sleep(1)
545        saveOffsets();
546        lowWatermark.checkpoint();
547        context.removeCheckpointFlag();
548        inCheckpointRecords = inRecords;
549        setThreadName("checkpoint");
550        log.debug(metadata.name() + ": checkpoint done");
551    }
552
553    protected void saveTimers() {
554        // TODO: save timers in the key value store NXP-22112
555    }
556
557    protected void saveState() {
558        // TODO: save key value store NXP-22112
559    }
560
561    protected void saveOffsets() {
562        if (tailer != null) {
563            tailer.commit();
564            Span span = Tracing.getTracer().getCurrentSpan();
565            span.addAnnotation("Checkpoint positions at " + Instant.now());
566        }
567    }
568
569    protected void sendRecords() {
570        boolean firstRecord = true;
571        for (String stream : metadata.outputStreams()) {
572            for (Record record : context.getRecords(stream)) {
573                if (record.getWatermark() == 0) {
574                    // use low watermark when not set
575                    record.setWatermark(lowWatermark.getLow().getValue());
576                }
577                if (firstRecord) {
578                    Span span = Tracing.getTracer().getCurrentSpan();
579                    span.addAnnotation("Sending records at " + Instant.now());
580                    firstRecord = false;
581                }
582                streamManager.append(stream, record);
583                outRecords++;
584            }
585            context.getRecords(stream).clear();
586        }
587    }
588
589    public Watermark getLowWatermark() {
590        return lowWatermark.getLow();
591    }
592
593    protected void setThreadName(String message) {
594        String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords
595                + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:"
596                + lowWatermark.getLow().getValue() + ",loop:" + counter;
597        if (message != null) {
598            name += "," + message;
599        }
600        Thread.currentThread().setName(name);
601    }
602
603    @Override
604    public void onPartitionsRevoked(Collection<LogPartition> partitions) {
605        setThreadName("rebalance revoked");
606    }
607
608    @Override
609    public void onPartitionsAssigned(Collection<LogPartition> partitions) {
610        lastReadTime = System.currentTimeMillis();
611        boolean isSpare = partitions.isEmpty();
612        setThreadName("rebalance assigned");
613        // reset the context
614        this.context = new ComputationContextImpl(streamManager, metadata, policy, partitions.isEmpty());
615        log.debug(metadata.name() + ": Init isSpare=" + isSpare);
616        computation.init(context);
617        lastReadTime = System.currentTimeMillis();
618        lastTimerExecution = 0;
619        assignmentLatch.countDown();
620        // what about watermark ?
621    }
622}