001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import java.time.Duration;
022import java.util.Collection;
023import java.util.LinkedHashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028import java.util.function.Supplier;
029import java.util.stream.Collectors;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.lib.stream.codec.Codec;
034import org.nuxeo.lib.stream.computation.Computation;
035import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
036import org.nuxeo.lib.stream.computation.Record;
037import org.nuxeo.lib.stream.computation.Watermark;
038import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
039import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval;
040import org.nuxeo.lib.stream.log.LogAppender;
041import org.nuxeo.lib.stream.log.LogManager;
042import org.nuxeo.lib.stream.log.LogPartition;
043import org.nuxeo.lib.stream.log.LogRecord;
044import org.nuxeo.lib.stream.log.LogTailer;
045import org.nuxeo.lib.stream.log.RebalanceException;
046import org.nuxeo.lib.stream.log.RebalanceListener;
047
048/**
049 * Thread driving a Computation
050 *
051 * @since 9.3
052 */
053@SuppressWarnings("EmptyMethod")
054public class ComputationRunner implements Runnable, RebalanceListener {
055    public static final Duration READ_TIMEOUT = Duration.ofMillis(25);
056
057    protected static final long STARVING_TIMEOUT_MS = 1000;
058
059    protected static final long INACTIVITY_BREAK_MS = 100;
060
061    private static final Log log = LogFactory.getLog(ComputationRunner.class);
062
063    protected final LogManager logManager;
064
065    protected final ComputationMetadataMapping metadata;
066
067    protected final LogTailer<Record> tailer;
068
069    protected final Supplier<Computation> supplier;
070
071    protected final CountDownLatch assignmentLatch = new CountDownLatch(1);
072
073    protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
074
075    protected final Codec<Record> inputCodec;
076
077    protected final Codec<Record> outputCodec;
078
079    protected ComputationContextImpl context;
080
081    protected volatile boolean stop;
082
083    protected volatile boolean drain;
084
085    protected Computation computation;
086
087    protected long counter;
088
089    protected long inRecords;
090
091    protected long inCheckpointRecords;
092
093    protected long outRecords;
094
095    protected long lastReadTime = System.currentTimeMillis();
096
097    protected long lastTimerExecution;
098
099    protected String threadName;
100
101    @SuppressWarnings("unchecked")
102    public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
103            List<LogPartition> defaultAssignment, LogManager logManager, Codec<Record> inputCodec,
104            Codec<Record> outputCodec) {
105        this.supplier = supplier;
106        this.metadata = metadata;
107        this.logManager = logManager;
108        this.context = new ComputationContextImpl(metadata);
109        this.inputCodec = inputCodec;
110        this.outputCodec = outputCodec;
111        if (metadata.inputStreams().isEmpty()) {
112            this.tailer = null;
113            assignmentLatch.countDown();
114        } else if (logManager.supportSubscribe()) {
115            this.tailer = logManager.subscribe(metadata.name(), metadata.inputStreams(), this, inputCodec);
116        } else {
117            this.tailer = logManager.createTailer(metadata.name(), defaultAssignment, inputCodec);
118            assignmentLatch.countDown();
119        }
120    }
121
122    public void stop() {
123        log.debug(metadata.name() + ": Receives Stop signal");
124        stop = true;
125        if (computation != null) {
126            computation.signalStop();
127        }
128    }
129
130    public void drain() {
131        log.debug(metadata.name() + ": Receives Drain signal");
132        drain = true;
133    }
134
135    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
136        if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
137            log.warn(metadata.name() + ": Timeout waiting for assignment");
138            return false;
139        }
140        return true;
141    }
142
143    @Override
144    public void run() {
145        threadName = Thread.currentThread().getName();
146        boolean interrupted = false;
147        computation = supplier.get();
148        log.debug(metadata.name() + ": Init");
149        computation.init(context);
150        log.debug(metadata.name() + ": Start");
151        try {
152            processLoop();
153        } catch (InterruptedException e) {
154            interrupted = true; // Thread.currentThread().interrupt() in finally
155            // this is expected when the pool is shutdownNow
156            String msg = metadata.name() + ": Interrupted";
157            if (log.isTraceEnabled()) {
158                log.debug(msg, e);
159            } else {
160                log.debug(msg);
161            }
162        } catch (Exception e) {
163            if (Thread.currentThread().isInterrupted()) {
164                // this can happen when pool is shutdownNow throwing ClosedByInterruptException
165                log.info(metadata.name() + ": Interrupted", e);
166                // TODO: test if we should set interrupted = true to rearm the interrupt flag
167            } else {
168                log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e);
169                throw e;
170            }
171        } finally {
172            try {
173                computation.destroy();
174                closeTailer();
175                log.debug(metadata.name() + ": Exited");
176            } finally {
177                if (interrupted) {
178                    Thread.currentThread().interrupt();
179                }
180            }
181        }
182    }
183
184    protected void closeTailer() {
185        if (tailer != null && !tailer.closed()) {
186            tailer.close();
187        }
188    }
189
190    protected void processLoop() throws InterruptedException {
191        boolean activity;
192        while (continueLoop()) {
193            activity = processTimer();
194            activity |= processRecord();
195            counter++;
196            if (!activity) {
197                // no timer nor record to process, take a break
198                Thread.sleep(INACTIVITY_BREAK_MS);
199            }
200        }
201    }
202
203    protected boolean continueLoop() {
204        if (stop || Thread.currentThread().isInterrupted()) {
205            return false;
206        } else if (drain) {
207            long now = System.currentTimeMillis();
208            // for a source we take lastTimerExecution starvation
209            if (metadata.inputStreams().isEmpty()) {
210                if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) {
211                    log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago");
212                    return false;
213                }
214            } else {
215                if ((now - lastReadTime) > STARVING_TIMEOUT_MS) {
216                    log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, "
217                            + inRecords + " records read, " + counter + " reads attempt");
218                    return false;
219                }
220            }
221        }
222        return true;
223    }
224
225    protected boolean processTimer() {
226        Map<String, Long> timers = context.getTimers();
227        if (timers.isEmpty()) {
228            return false;
229        }
230        if (tailer != null && tailer.assignments().isEmpty()) {
231            // needed to ensure single source across multiple nodes
232            return false;
233        }
234        long now = System.currentTimeMillis();
235        final boolean[] timerUpdate = { false };
236        // filter and order timers
237        LinkedHashMap<String, Long> sortedTimer = timers.entrySet()
238                                                        .stream()
239                                                        .filter(entry -> entry.getValue() <= now)
240                                                        .sorted(Map.Entry.comparingByValue())
241                                                        .collect(
242                                                                Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
243                                                                        (e1, e2) -> e1, LinkedHashMap::new));
244        sortedTimer.forEach((key, value) -> {
245            context.removeTimer(key);
246            computation.processTimer(context, key, value);
247            timerUpdate[0] = true;
248        });
249        if (timerUpdate[0]) {
250            checkSourceLowWatermark();
251            lastTimerExecution = now;
252            setThreadName("timer");
253            checkpointIfNecessary();
254            if (context.requireTerminate()) {
255                stop = true;
256            }
257            return true;
258        }
259        return false;
260    }
261
262    protected boolean processRecord() throws InterruptedException {
263        if (context.requireTerminate()) {
264            stop = true;
265            return true;
266        }
267        if (tailer == null) {
268            return false;
269        }
270        Duration timeoutRead = getTimeoutDuration();
271        LogRecord<Record> logRecord = null;
272        try {
273            logRecord = tailer.read(timeoutRead);
274        } catch (RebalanceException e) {
275            // the revoke has done a checkpoint we can continue
276        }
277        Record record;
278        if (logRecord != null) {
279            record = logRecord.message();
280            lastReadTime = System.currentTimeMillis();
281            inRecords++;
282            lowWatermark.mark(record.getWatermark());
283            String from = metadata.reverseMap(logRecord.offset().partition().name());
284            computation.processRecord(context, from, record);
285            checkRecordFlags(record);
286            checkSourceLowWatermark();
287            setThreadName("record");
288            checkpointIfNecessary();
289            return true;
290        }
291        return false;
292    }
293
294    protected Duration getTimeoutDuration() {
295        // Adapt the duration so we are not throttling when one of the input stream is empty
296        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime));
297    }
298
299    protected void checkSourceLowWatermark() {
300        long watermark = context.getSourceLowWatermark();
301        if (watermark > 0) {
302            lowWatermark.mark(Watermark.ofValue(watermark));
303            context.setSourceLowWatermark(0);
304        }
305    }
306
307    protected void checkRecordFlags(Record record) {
308        if (record.getFlags().contains(Record.Flag.POISON_PILL)) {
309            log.info(metadata.name() + ": Receive POISON PILL");
310            context.askForCheckpoint();
311            stop = true;
312        } else if (record.getFlags().contains(Record.Flag.COMMIT)) {
313            context.askForCheckpoint();
314        }
315    }
316
317    protected void checkpointIfNecessary() {
318        if (context.requireCheckpoint()) {
319            boolean completed = false;
320            try {
321                checkpoint();
322                completed = true;
323                inCheckpointRecords = inRecords;
324            } finally {
325                if (!completed) {
326                    log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
327                }
328            }
329        }
330    }
331
332    protected void checkpoint() {
333        sendRecords();
334        saveTimers();
335        saveState();
336        // To Simulate slow checkpoint add a Thread.sleep(1)
337        saveOffsets();
338        lowWatermark.checkpoint();
339        context.removeCheckpointFlag();
340        log.debug(metadata.name() + ": checkpoint");
341        setThreadName("checkpoint");
342    }
343
344    protected void saveTimers() {
345        // TODO: save timers in the key value store NXP-22112
346    }
347
348    protected void saveState() {
349        // TODO: save key value store NXP-22112
350    }
351
352    protected void saveOffsets() {
353        if (tailer != null) {
354            tailer.commit();
355        }
356    }
357
358    protected void sendRecords() {
359        for (String stream : metadata.outputStreams()) {
360            LogAppender<Record> appender = logManager.getAppender(stream, outputCodec);
361            for (Record record : context.getRecords(stream)) {
362                if (record.getWatermark() == 0) {
363                    // use low watermark when not set
364                    record.setWatermark(lowWatermark.getLow().getValue());
365                }
366                appender.append(record.getKey(), record);
367                outRecords++;
368            }
369            context.getRecords(stream).clear();
370        }
371    }
372
373    public Watermark getLowWatermark() {
374        return lowWatermark.getLow();
375    }
376
377    protected void setThreadName(String message) {
378        String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords
379                + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:"
380                + lowWatermark.getLow().getValue() + ",loop:" + counter;
381        if (message != null) {
382            name += "," + message;
383        }
384        Thread.currentThread().setName(name);
385    }
386
387    @Override
388    public void onPartitionsRevoked(Collection<LogPartition> partitions) {
389        setThreadName("rebalance revoked");
390    }
391
392    @Override
393    public void onPartitionsAssigned(Collection<LogPartition> partitions) {
394        lastReadTime = System.currentTimeMillis();
395        setThreadName("rebalance assigned");
396        // reset the context
397        this.context = new ComputationContextImpl(metadata);
398        log.debug(metadata.name() + ": Init");
399        computation.init(context);
400        lastReadTime = System.currentTimeMillis();
401        lastTimerExecution = 0;
402        assignmentLatch.countDown();
403        // what about watermark ?
404    }
405}