001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import java.time.Duration;
022import java.util.Collection;
023import java.util.LinkedHashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028import java.util.function.Supplier;
029import java.util.stream.Collectors;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.lib.stream.computation.Computation;
034import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
035import org.nuxeo.lib.stream.computation.Record;
036import org.nuxeo.lib.stream.computation.Watermark;
037import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
038import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval;
039import org.nuxeo.lib.stream.log.LogAppender;
040import org.nuxeo.lib.stream.log.LogManager;
041import org.nuxeo.lib.stream.log.LogPartition;
042import org.nuxeo.lib.stream.log.LogRecord;
043import org.nuxeo.lib.stream.log.LogTailer;
044import org.nuxeo.lib.stream.log.RebalanceException;
045import org.nuxeo.lib.stream.log.RebalanceListener;
046
047/**
048 * Thread driving a Computation
049 *
050 * @since 9.3
051 */
052@SuppressWarnings("EmptyMethod")
053public class ComputationRunner implements Runnable, RebalanceListener {
054    public static final Duration READ_TIMEOUT = Duration.ofMillis(25);
055
056    protected static final long STARVING_TIMEOUT_MS = 1000;
057
058    protected static final long INACTIVITY_BREAK_MS = 100;
059
060    private static final Log log = LogFactory.getLog(ComputationRunner.class);
061
062    protected final LogManager logManager;
063
064    protected final ComputationMetadataMapping metadata;
065
066    protected final LogTailer<Record> tailer;
067
068    protected final Supplier<Computation> supplier;
069
070    protected final CountDownLatch assignmentLatch = new CountDownLatch(1);
071
072    protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
073
074    protected ComputationContextImpl context;
075
076    protected volatile boolean stop;
077
078    protected volatile boolean drain;
079
080    protected Computation computation;
081
082    protected long counter;
083
084    protected long inRecords;
085
086    protected long inCheckpointRecords;
087
088    protected long outRecords;
089
090    protected long lastReadTime = System.currentTimeMillis();
091
092    protected long lastTimerExecution;
093
094    protected String threadName;
095
096    @SuppressWarnings("unchecked")
097    public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
098            List<LogPartition> defaultAssignment, LogManager logManager) {
099        this.supplier = supplier;
100        this.metadata = metadata;
101        this.logManager = logManager;
102        this.context = new ComputationContextImpl(metadata);
103        if (metadata.inputStreams().isEmpty()) {
104            this.tailer = null;
105            assignmentLatch.countDown();
106        } else if (logManager.supportSubscribe()) {
107            this.tailer = logManager.subscribe(metadata.name(), metadata.inputStreams(), this);
108        } else {
109            this.tailer = logManager.createTailer(metadata.name(), defaultAssignment);
110            assignmentLatch.countDown();
111        }
112    }
113
114    public void stop() {
115        log.debug(metadata.name() + ": Receives Stop signal");
116        stop = true;
117    }
118
119    public void drain() {
120        log.debug(metadata.name() + ": Receives Drain signal");
121        drain = true;
122    }
123
124    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
125        if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
126            log.warn(metadata.name() + ": Timeout waiting for assignment");
127            return false;
128        }
129        return true;
130    }
131
132    @Override
133    public void run() {
134        threadName = Thread.currentThread().getName();
135        boolean interrupted = false;
136        computation = supplier.get();
137        log.debug(metadata.name() + ": Init");
138        computation.init(context);
139        log.debug(metadata.name() + ": Start");
140        try {
141            processLoop();
142        } catch (InterruptedException e) {
143            interrupted = true; // Thread.currentThread().interrupt() in finally
144            // this is expected when the pool is shutdownNow
145            if (log.isTraceEnabled()) {
146                log.debug(metadata.name() + ": Interrupted", e);
147            } else {
148                log.debug(metadata.name() + ": Interrupted");
149            }
150        } catch (Exception e) {
151            if (Thread.currentThread().isInterrupted()) {
152                // this can happen when pool is shutdownNow throwing ClosedByInterruptException
153                log.info(metadata.name() + ": Interrupted", e);
154                // TODO: test if we should set interrupted = true to rearm the interrupt flag
155            } else {
156                log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e);
157                throw e;
158            }
159        } finally {
160            try {
161                computation.destroy();
162                closeTailer();
163                log.debug(metadata.name() + ": Exited");
164            } finally {
165                if (interrupted) {
166                    Thread.currentThread().interrupt();
167                }
168            }
169        }
170    }
171
172    protected void closeTailer() {
173        if (tailer != null && !tailer.closed()) {
174            tailer.close();
175        }
176    }
177
178    protected void processLoop() throws InterruptedException {
179        boolean activity;
180        while (continueLoop()) {
181            activity = processTimer();
182            activity |= processRecord();
183            counter++;
184            if (!activity) {
185                // no timer nor record to process, take a break
186                Thread.sleep(INACTIVITY_BREAK_MS);
187            }
188        }
189    }
190
191    protected boolean continueLoop() {
192        if (stop || Thread.currentThread().isInterrupted()) {
193            return false;
194        } else if (drain) {
195            long now = System.currentTimeMillis();
196            // for a source we take lastTimerExecution starvation
197            if (metadata.inputStreams().isEmpty()) {
198                if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) {
199                    log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago");
200                    return false;
201                }
202            } else {
203                if ((now - lastReadTime) > STARVING_TIMEOUT_MS) {
204                    log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, "
205                            + inRecords + " records read, " + counter + " reads attempt");
206                    return false;
207                }
208            }
209        }
210        return true;
211    }
212
213    protected boolean processTimer() {
214        Map<String, Long> timers = context.getTimers();
215        if (timers.isEmpty()) {
216            return false;
217        }
218        if (tailer != null && tailer.assignments().isEmpty()) {
219            // needed to ensure single source across multiple nodes
220            return false;
221        }
222        long now = System.currentTimeMillis();
223        final boolean[] timerUpdate = { false };
224        // filter and order timers
225        LinkedHashMap<String, Long> sortedTimer = timers.entrySet()
226                                                        .stream()
227                                                        .filter(entry -> entry.getValue() <= now)
228                                                        .sorted(Map.Entry.comparingByValue())
229                                                        .collect(
230                                                                Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
231                                                                        (e1, e2) -> e1, LinkedHashMap::new));
232        sortedTimer.forEach((key, value) -> {
233            context.removeTimer(key);
234            computation.processTimer(context, key, value);
235            timerUpdate[0] = true;
236        });
237        if (timerUpdate[0]) {
238            checkSourceLowWatermark();
239            lastTimerExecution = now;
240            setThreadName("timer");
241            checkpointIfNecessary();
242            if (context.requireTerminate()) {
243                stop = true;
244            }
245            return true;
246        }
247        return false;
248    }
249
250    protected boolean processRecord() throws InterruptedException {
251        if (context.requireTerminate()) {
252            stop = true;
253            return true;
254        }
255        if (tailer == null) {
256            return false;
257        }
258        Duration timeoutRead = getTimeoutDuration();
259        LogRecord<Record> logRecord = null;
260        try {
261            logRecord = tailer.read(timeoutRead);
262        } catch (RebalanceException e) {
263            // the revoke has done a checkpoint we can continue
264        }
265        Record record;
266        if (logRecord != null) {
267            record = logRecord.message();
268            lastReadTime = System.currentTimeMillis();
269            inRecords++;
270            lowWatermark.mark(record.watermark);
271            String from = metadata.reverseMap(logRecord.offset().partition().name());
272            // System.out.println(metadata.name() + ": Receive from " + from + " record: " + record);
273            computation.processRecord(context, from, record);
274            checkRecordFlags(record);
275            checkSourceLowWatermark();
276            setThreadName("record");
277            checkpointIfNecessary();
278            return true;
279        }
280        return false;
281    }
282
283    protected Duration getTimeoutDuration() {
284        // Adapt the duration so we are not throttling when one of the input stream is empty
285        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime));
286    }
287
288    protected void checkSourceLowWatermark() {
289        long watermark = context.getSourceLowWatermark();
290        if (watermark > 0) {
291            lowWatermark.mark(Watermark.ofValue(watermark));
292            // System.out.println(metadata.name() + ": Set source wm " + lowWatermark);
293            context.setSourceLowWatermark(0);
294        }
295    }
296
297    protected void checkRecordFlags(Record record) {
298        if (record.flags.contains(Record.Flag.POISON_PILL)) {
299            log.info(metadata.name() + ": Receive POISON PILL");
300            context.askForCheckpoint();
301            stop = true;
302        } else if (record.flags.contains(Record.Flag.COMMIT)) {
303            context.askForCheckpoint();
304        }
305    }
306
307    protected void checkpointIfNecessary() {
308        if (context.requireCheckpoint()) {
309            boolean completed = false;
310            try {
311                checkpoint();
312                completed = true;
313                inCheckpointRecords = inRecords;
314            } finally {
315                if (!completed) {
316                    log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
317                }
318            }
319        }
320    }
321
322    protected void checkpoint() {
323        sendRecords();
324        saveTimers();
325        saveState();
326        // Simulate slow checkpoint
327        // try {
328        // Thread.sleep(1);
329        // } catch (InterruptedException e) {
330        // Thread.currentThread().interrupt();
331        // throw e;
332        // }
333        saveOffsets();
334        lowWatermark.checkpoint();
335        // System.out.println("checkpoint " + metadata.name() + " " + lowWatermark);
336        context.removeCheckpointFlag();
337        log.debug(metadata.name() + ": checkpoint");
338        setThreadName("checkpoint");
339    }
340
341    protected void saveTimers() {
342        // TODO: save timers in the key value store NXP-22112
343    }
344
345    protected void saveState() {
346        // TODO: save key value store NXP-22112
347    }
348
349    protected void saveOffsets() {
350        if (tailer != null) {
351            tailer.commit();
352        }
353    }
354
355    protected void sendRecords() {
356        for (String stream : metadata.outputStreams()) {
357            LogAppender<Record> appender = logManager.getAppender(stream);
358            for (Record record : context.getRecords(stream)) {
359                // System.out.println(metadata.name() + " send record to " + stream + " lowWatermark " + lowWatermark);
360                if (record.watermark == 0) {
361                    // use low watermark when not set
362                    record.watermark = lowWatermark.getLow().getValue();
363                }
364                appender.append(record.key, record);
365                outRecords++;
366            }
367            context.getRecords(stream).clear();
368        }
369    }
370
371    public Watermark getLowWatermark() {
372        return lowWatermark.getLow();
373    }
374
375    protected void setThreadName(String message) {
376        String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords
377                + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:"
378                + lowWatermark.getLow().getValue() + ",loop:" + counter;
379        if (message != null) {
380            name += "," + message;
381        }
382        Thread.currentThread().setName(name);
383    }
384
385    @Override
386    public void onPartitionsRevoked(Collection<LogPartition> partitions) {
387        setThreadName("rebalance revoked");
388    }
389
390    @Override
391    public void onPartitionsAssigned(Collection<LogPartition> partitions) {
392        lastReadTime = System.currentTimeMillis();
393        setThreadName("rebalance assigned");
394        // reset the context
395        this.context = new ComputationContextImpl(metadata);
396        log.debug(metadata.name() + ": Init");
397        computation.init(context);
398        lastReadTime = System.currentTimeMillis();
399        lastTimerExecution = 0;
400        assignmentLatch.countDown();
401        // what about watermark ?
402    }
403}