001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import java.time.Duration;
022import java.util.Collection;
023import java.util.LinkedHashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028import java.util.function.Supplier;
029import java.util.stream.Collectors;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.lib.stream.codec.Codec;
034import org.nuxeo.lib.stream.computation.Computation;
035import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
036import org.nuxeo.lib.stream.computation.ComputationPolicy;
037import org.nuxeo.lib.stream.computation.Record;
038import org.nuxeo.lib.stream.computation.Watermark;
039import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
040import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval;
041import org.nuxeo.lib.stream.log.LogAppender;
042import org.nuxeo.lib.stream.log.LogManager;
043import org.nuxeo.lib.stream.log.LogPartition;
044import org.nuxeo.lib.stream.log.LogRecord;
045import org.nuxeo.lib.stream.log.LogTailer;
046import org.nuxeo.lib.stream.log.RebalanceException;
047import org.nuxeo.lib.stream.log.RebalanceListener;
048
049import net.jodah.failsafe.Failsafe;
050
051/**
052 * Thread driving a Computation
053 *
054 * @since 9.3
055 */
056@SuppressWarnings("EmptyMethod")
057public class ComputationRunner implements Runnable, RebalanceListener {
058    public static final Duration READ_TIMEOUT = Duration.ofMillis(25);
059
060    protected static final long STARVING_TIMEOUT_MS = 1000;
061
062    protected static final long INACTIVITY_BREAK_MS = 100;
063
064    private static final Log log = LogFactory.getLog(ComputationRunner.class);
065
066    protected final LogManager logManager;
067
068    protected final ComputationMetadataMapping metadata;
069
070    protected final LogTailer<Record> tailer;
071
072    protected final Supplier<Computation> supplier;
073
074    protected final CountDownLatch assignmentLatch = new CountDownLatch(1);
075
076    protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
077
078    protected final Codec<Record> inputCodec;
079
080    protected final Codec<Record> outputCodec;
081
082    protected final ComputationPolicy policy;
083
084    protected ComputationContextImpl context;
085
086    protected volatile boolean stop;
087
088    protected volatile boolean drain;
089
090    protected Computation computation;
091
092    protected long counter;
093
094    protected long inRecords;
095
096    protected long inCheckpointRecords;
097
098    protected long outRecords;
099
100    protected long lastReadTime = System.currentTimeMillis();
101
102    protected long lastTimerExecution;
103
104    protected String threadName;
105
106    @SuppressWarnings("unchecked")
107    public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
108            List<LogPartition> defaultAssignment, LogManager logManager, Codec<Record> inputCodec,
109            Codec<Record> outputCodec, ComputationPolicy policy) {
110        this.supplier = supplier;
111        this.metadata = metadata;
112        this.logManager = logManager;
113        this.context = new ComputationContextImpl(logManager, metadata, policy);
114        this.inputCodec = inputCodec;
115        this.outputCodec = outputCodec;
116        this.policy = policy;
117        if (metadata.inputStreams().isEmpty()) {
118            this.tailer = null;
119            assignmentLatch.countDown();
120        } else if (logManager.supportSubscribe()) {
121            this.tailer = logManager.subscribe(metadata.name(), metadata.inputStreams(), this, inputCodec);
122        } else {
123            this.tailer = logManager.createTailer(metadata.name(), defaultAssignment, inputCodec);
124            assignmentLatch.countDown();
125        }
126    }
127
128    public void stop() {
129        log.debug(metadata.name() + ": Receives Stop signal");
130        stop = true;
131        if (computation != null) {
132            computation.signalStop();
133        }
134    }
135
136    public void drain() {
137        log.debug(metadata.name() + ": Receives Drain signal");
138        drain = true;
139    }
140
141    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
142        if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
143            log.warn(metadata.name() + ": Timeout waiting for assignment");
144            return false;
145        }
146        return true;
147    }
148
149    @Override
150    public void run() {
151        threadName = Thread.currentThread().getName();
152        boolean interrupted = false;
153        computation = supplier.get();
154        log.debug(metadata.name() + ": Init");
155        computation.init(context);
156        log.debug(metadata.name() + ": Start");
157        try {
158            processLoop();
159        } catch (InterruptedException e) {
160            interrupted = true; // Thread.currentThread().interrupt() in finally
161            // this is expected when the pool is shutdownNow
162            String msg = metadata.name() + ": Interrupted";
163            if (log.isTraceEnabled()) {
164                log.debug(msg, e);
165            } else {
166                log.debug(msg);
167            }
168        } catch (Exception e) {
169            if (Thread.currentThread().isInterrupted()) {
170                // this can happen when pool is shutdownNow throwing ClosedByInterruptException
171                log.info(metadata.name() + ": Interrupted", e);
172            } else {
173                log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e);
174                throw e;
175            }
176        } finally {
177            try {
178                computation.destroy();
179                closeTailer();
180                log.debug(metadata.name() + ": Exited");
181            } finally {
182                if (interrupted) {
183                    Thread.currentThread().interrupt();
184                }
185            }
186        }
187    }
188
189    protected void closeTailer() {
190        if (tailer != null && !tailer.closed()) {
191            tailer.close();
192        }
193    }
194
195    protected void processLoop() throws InterruptedException {
196        boolean activity;
197        while (continueLoop()) {
198            activity = processTimer();
199            activity |= processRecord();
200            counter++;
201            if (!activity) {
202                // no timer nor record to process, take a break
203                Thread.sleep(INACTIVITY_BREAK_MS);
204            }
205        }
206    }
207
208    protected boolean continueLoop() {
209        if (stop || Thread.currentThread().isInterrupted()) {
210            return false;
211        } else if (drain) {
212            long now = System.currentTimeMillis();
213            // for a source we take lastTimerExecution starvation
214            if (metadata.inputStreams().isEmpty()) {
215                if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) {
216                    log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago");
217                    return false;
218                }
219            } else {
220                if ((now - lastReadTime) > STARVING_TIMEOUT_MS) {
221                    log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, "
222                            + inRecords + " records read, " + counter + " reads attempt");
223                    return false;
224                }
225            }
226        }
227        return true;
228    }
229
230    protected boolean processTimer() {
231        Map<String, Long> timers = context.getTimers();
232        if (timers.isEmpty()) {
233            return false;
234        }
235        if (tailer != null && tailer.assignments().isEmpty()) {
236            // needed to ensure single source across multiple nodes
237            return false;
238        }
239        long now = System.currentTimeMillis();
240        final boolean[] timerUpdate = { false };
241        // filter and order timers
242        LinkedHashMap<String, Long> sortedTimer = timers.entrySet()
243                                                        .stream()
244                                                        .filter(entry -> entry.getValue() <= now)
245                                                        .sorted(Map.Entry.comparingByValue())
246                                                        .collect(
247                                                                Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
248                                                                        (e1, e2) -> e1, LinkedHashMap::new));
249        sortedTimer.forEach((key, value) -> {
250            context.removeTimer(key);
251            processTimerWithRetry(key, value);
252            timerUpdate[0] = true;
253        });
254        if (timerUpdate[0]) {
255            checkSourceLowWatermark();
256            lastTimerExecution = now;
257            setThreadName("timer");
258            checkpointIfNecessary();
259            if (context.requireTerminate()) {
260                stop = true;
261            }
262            return true;
263        }
264        return false;
265    }
266
267    protected void processTimerWithRetry(String key, Long value) {
268        Failsafe.with(policy.getRetryPolicy())
269                .onRetry(failure -> computation.processRetry(context, failure))
270                .onFailure(failure -> computation.processFailure(context, failure))
271                .withFallback(() -> processFallback(context))
272                .run(() -> computation.processTimer(context, key, value));
273    }
274
275    protected boolean processRecord() throws InterruptedException {
276        if (context.requireTerminate()) {
277            stop = true;
278            return true;
279        }
280        if (tailer == null) {
281            return false;
282        }
283        Duration timeoutRead = getTimeoutDuration();
284        LogRecord<Record> logRecord = null;
285        try {
286            logRecord = tailer.read(timeoutRead);
287        } catch (RebalanceException e) {
288            // the revoke has done a checkpoint we can continue
289        }
290        Record record;
291        if (logRecord != null) {
292            record = logRecord.message();
293            lastReadTime = System.currentTimeMillis();
294            inRecords++;
295            lowWatermark.mark(record.getWatermark());
296            String from = metadata.reverseMap(logRecord.offset().partition().name());
297            context.setLastOffset(logRecord.offset());
298            processRecordWithRetry(from, record);
299            checkRecordFlags(record);
300            checkSourceLowWatermark();
301            setThreadName("record");
302            checkpointIfNecessary();
303            return true;
304        }
305        return false;
306    }
307
308    protected void processRecordWithRetry(String from, Record record) {
309        Failsafe.with(policy.getRetryPolicy())
310                .onRetry(failure -> computation.processRetry(context, failure))
311                .onFailure(failure -> computation.processFailure(context, failure))
312                .withFallback(() -> processFallback(context))
313                .run(() -> computation.processRecord(context, from, record));
314    }
315
316    protected void processFallback(ComputationContextImpl context) {
317        if (policy.continueOnFailure()) {
318            log.error(String.format("Skip record after failure: %s", context.getLastOffset()));
319            context.askForCheckpoint();
320        } else {
321            log.error(String.format("Terminate computation: %s due to previous failure", metadata.name()));
322            context.cancelAskForCheckpoint();
323            context.askForTermination();
324        }
325    }
326
327    protected Duration getTimeoutDuration() {
328        // Adapt the duration so we are not throttling when one of the input stream is empty
329        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime));
330    }
331
332    protected void checkSourceLowWatermark() {
333        long watermark = context.getSourceLowWatermark();
334        if (watermark > 0) {
335            lowWatermark.mark(Watermark.ofValue(watermark));
336            context.setSourceLowWatermark(0);
337        }
338    }
339
340    protected void checkRecordFlags(Record record) {
341        if (record.getFlags().contains(Record.Flag.POISON_PILL)) {
342            log.info(metadata.name() + ": Receive POISON PILL");
343            context.askForCheckpoint();
344            stop = true;
345        } else if (record.getFlags().contains(Record.Flag.COMMIT)) {
346            context.askForCheckpoint();
347        }
348    }
349
350    protected void checkpointIfNecessary() {
351        if (context.requireCheckpoint()) {
352            boolean completed = false;
353            try {
354                checkpoint();
355                completed = true;
356            } finally {
357                if (!completed) {
358                    log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
359                }
360            }
361        }
362    }
363
364    protected void checkpoint() {
365        sendRecords();
366        saveTimers();
367        saveState();
368        // To Simulate slow checkpoint add a Thread.sleep(1)
369        saveOffsets();
370        lowWatermark.checkpoint();
371        context.removeCheckpointFlag();
372        log.debug(metadata.name() + ": checkpoint");
373        inCheckpointRecords = inRecords;
374        setThreadName("checkpoint");
375    }
376
377    protected void saveTimers() {
378        // TODO: save timers in the key value store NXP-22112
379    }
380
381    protected void saveState() {
382        // TODO: save key value store NXP-22112
383    }
384
385    protected void saveOffsets() {
386        if (tailer != null) {
387            tailer.commit();
388        }
389    }
390
391    protected void sendRecords() {
392        for (String stream : metadata.outputStreams()) {
393            LogAppender<Record> appender = logManager.getAppender(stream, outputCodec);
394            for (Record record : context.getRecords(stream)) {
395                if (record.getWatermark() == 0) {
396                    // use low watermark when not set
397                    record.setWatermark(lowWatermark.getLow().getValue());
398                }
399                appender.append(record.getKey(), record);
400                outRecords++;
401            }
402            context.getRecords(stream).clear();
403        }
404    }
405
406    public Watermark getLowWatermark() {
407        return lowWatermark.getLow();
408    }
409
410    protected void setThreadName(String message) {
411        String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords
412                + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:"
413                + lowWatermark.getLow().getValue() + ",loop:" + counter;
414        if (message != null) {
415            name += "," + message;
416        }
417        Thread.currentThread().setName(name);
418    }
419
420    @Override
421    public void onPartitionsRevoked(Collection<LogPartition> partitions) {
422        setThreadName("rebalance revoked");
423    }
424
425    @Override
426    public void onPartitionsAssigned(Collection<LogPartition> partitions) {
427        lastReadTime = System.currentTimeMillis();
428        setThreadName("rebalance assigned");
429        // reset the context
430        this.context = new ComputationContextImpl(logManager, metadata, policy);
431        log.debug(metadata.name() + ": Init");
432        computation.init(context);
433        lastReadTime = System.currentTimeMillis();
434        lastTimerExecution = 0;
435        assignmentLatch.countDown();
436        // what about watermark ?
437    }
438}