001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import java.time.Duration;
022import java.util.Collection;
023import java.util.LinkedHashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028import java.util.function.Supplier;
029import java.util.stream.Collectors;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.lib.stream.computation.Computation;
034import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
035import org.nuxeo.lib.stream.computation.Record;
036import org.nuxeo.lib.stream.computation.Watermark;
037import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
038import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval;
039import org.nuxeo.lib.stream.log.LogAppender;
040import org.nuxeo.lib.stream.log.LogManager;
041import org.nuxeo.lib.stream.log.LogPartition;
042import org.nuxeo.lib.stream.log.LogRecord;
043import org.nuxeo.lib.stream.log.LogTailer;
044import org.nuxeo.lib.stream.log.RebalanceException;
045import org.nuxeo.lib.stream.log.RebalanceListener;
046
047/**
048 * Thread driving a Computation
049 *
050 * @since 9.3
051 */
052@SuppressWarnings("EmptyMethod")
053public class ComputationRunner implements Runnable, RebalanceListener {
054    public static final Duration READ_TIMEOUT = Duration.ofMillis(25);
055
056    protected static final long STARVING_TIMEOUT_MS = 1000;
057
058    private static final Log log = LogFactory.getLog(ComputationRunner.class);
059
060    protected final LogManager logManager;
061
062    protected final ComputationMetadataMapping metadata;
063
064    protected final LogTailer<Record> tailer;
065
066    protected final Supplier<Computation> supplier;
067
068    protected final CountDownLatch assignmentLatch = new CountDownLatch(1);
069
070    protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
071
072    protected ComputationContextImpl context;
073
074    protected volatile boolean stop;
075
076    protected volatile boolean drain;
077
078    protected Computation computation;
079
080    protected long counter;
081
082    protected long inRecords;
083
084    protected long inCheckpointRecords;
085
086    protected long outRecords;
087
088    protected long lastReadTime = System.currentTimeMillis();
089
090    protected long lastTimerExecution;
091
092    protected String threadName;
093
094    @SuppressWarnings("unchecked")
095    public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
096            List<LogPartition> defaultAssignment, LogManager logManager) {
097        this.supplier = supplier;
098        this.metadata = metadata;
099        this.logManager = logManager;
100        this.context = new ComputationContextImpl(metadata);
101        if (metadata.inputStreams().isEmpty()) {
102            this.tailer = null;
103            assignmentLatch.countDown();
104        } else if (logManager.supportSubscribe()) {
105            this.tailer = logManager.subscribe(metadata.name(), metadata.inputStreams(), this);
106        } else {
107            this.tailer = logManager.createTailer(metadata.name(), defaultAssignment);
108            assignmentLatch.countDown();
109        }
110    }
111
112    public void stop() {
113        log.debug(metadata.name() + ": Receives Stop signal");
114        stop = true;
115    }
116
117    public void drain() {
118        log.debug(metadata.name() + ": Receives Drain signal");
119        drain = true;
120    }
121
122    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
123        if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
124            log.warn(metadata.name() + ": Timeout waiting for assignment");
125            return false;
126        }
127        return true;
128    }
129
130    @Override
131    public void run() {
132        threadName = Thread.currentThread().getName();
133        boolean interrupted = false;
134        computation = supplier.get();
135        log.debug(metadata.name() + ": Init");
136        computation.init(context);
137        log.debug(metadata.name() + ": Start");
138        try {
139            processLoop();
140        } catch (InterruptedException e) {
141            // this is expected when the pool is shutdownNow
142            if (log.isTraceEnabled()) {
143                log.debug(metadata.name() + ": Interrupted", e);
144            } else {
145                log.debug(metadata.name() + ": Interrupted");
146            }
147            // the interrupt flag is set after the tailer are closed
148            interrupted = true;
149        } catch (Exception e) {
150            if (Thread.currentThread().isInterrupted()) {
151                // this can happen when pool is shutdownNow throwing ClosedByInterruptException
152                log.info(metadata.name() + ": Interrupted", e);
153                // TODO: test if we should set interrupted = true to rearm the interrupt flag
154            } else {
155                log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e);
156                throw e;
157            }
158        } finally {
159            try {
160                computation.destroy();
161                closeTailer();
162                log.debug(metadata.name() + ": Exited");
163            } finally {
164                if (interrupted) {
165                    Thread.currentThread().interrupt();
166                }
167            }
168        }
169    }
170
171    protected void closeTailer() {
172        if (tailer != null && !tailer.closed()) {
173            tailer.close();
174        }
175    }
176
177    protected void processLoop() throws InterruptedException {
178        while (continueLoop()) {
179            processTimer();
180            processRecord();
181            counter++;
182            // TODO: add pause for computation without inputs or without timer to prevent CPU hogs
183        }
184    }
185
186    protected boolean continueLoop() {
187        if (stop || Thread.currentThread().isInterrupted()) {
188            return false;
189        } else if (drain) {
190            long now = System.currentTimeMillis();
191            // for a source we take lastTimerExecution starvation
192            if (metadata.inputStreams().isEmpty()) {
193                if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) {
194                    log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago");
195                    return false;
196                }
197            } else {
198                if ((now - lastReadTime) > STARVING_TIMEOUT_MS) {
199                    log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, "
200                            + inRecords + " records read, " + counter + " reads attempt");
201                    return false;
202                }
203            }
204        }
205        return true;
206    }
207
208    protected void processTimer() {
209        Map<String, Long> timers = context.getTimers();
210        if (timers.isEmpty()) {
211            return;
212        }
213        long now = System.currentTimeMillis();
214        final boolean[] timerUpdate = { false };
215        // filter and order timers
216        LinkedHashMap<String, Long> sortedTimer = timers.entrySet()
217                                                        .stream()
218                                                        .filter(entry -> entry.getValue() <= now)
219                                                        .sorted(Map.Entry.comparingByValue())
220                                                        .collect(
221                                                                Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
222                                                                        (e1, e2) -> e1, LinkedHashMap::new));
223        sortedTimer.forEach((key, value) -> {
224            context.removeTimer(key);
225            computation.processTimer(context, key, value);
226            timerUpdate[0] = true;
227        });
228        if (timerUpdate[0]) {
229            checkSourceLowWatermark();
230            lastTimerExecution = now;
231            setThreadName("timer");
232            checkpointIfNecessary();
233        }
234
235    }
236
237    protected void processRecord() throws InterruptedException {
238        if (tailer == null) {
239            return;
240        }
241        Duration timeoutRead = getTimeoutDuration();
242        LogRecord<Record> logRecord = null;
243        try {
244            logRecord = tailer.read(timeoutRead);
245        } catch (RebalanceException e) {
246            // the revoke has done a checkpoint we can continue
247        }
248        Record record;
249        if (logRecord != null) {
250            record = logRecord.message();
251            lastReadTime = System.currentTimeMillis();
252            inRecords++;
253            lowWatermark.mark(record.watermark);
254            String from = metadata.reverseMap(logRecord.offset().partition().name());
255            // System.out.println(metadata.name() + ": Receive from " + from + " record: " + record);
256            computation.processRecord(context, from, record);
257            checkRecordFlags(record);
258            checkSourceLowWatermark();
259            setThreadName("record");
260            checkpointIfNecessary();
261        }
262    }
263
264    protected Duration getTimeoutDuration() {
265        // Adapt the duration so we are not throttling when one of the input stream is empty
266        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime));
267    }
268
269    protected void checkSourceLowWatermark() {
270        long watermark = context.getSourceLowWatermark();
271        if (watermark > 0) {
272            lowWatermark.mark(Watermark.ofValue(watermark));
273            // System.out.println(metadata.name() + ": Set source wm " + lowWatermark);
274            context.setSourceLowWatermark(0);
275        }
276    }
277
278    protected void checkRecordFlags(Record record) {
279        if (record.flags.contains(Record.Flag.POISON_PILL)) {
280            log.info(metadata.name() + ": Receive POISON PILL");
281            context.askForCheckpoint();
282            stop = true;
283        } else if (record.flags.contains(Record.Flag.COMMIT)) {
284            context.askForCheckpoint();
285        }
286    }
287
288    protected void checkpointIfNecessary() {
289        if (context.requireCheckpoint()) {
290            boolean completed = false;
291            try {
292                checkpoint();
293                completed = true;
294                inCheckpointRecords = inRecords;
295            } finally {
296                if (!completed) {
297                    log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
298                }
299            }
300        }
301    }
302
303    protected void checkpoint() {
304        sendRecords();
305        saveTimers();
306        saveState();
307        // Simulate slow checkpoint
308        // try {
309        // Thread.sleep(1);
310        // } catch (InterruptedException e) {
311        // Thread.currentThread().interrupt();
312        // throw e;
313        // }
314        saveOffsets();
315        lowWatermark.checkpoint();
316        // System.out.println("checkpoint " + metadata.name() + " " + lowWatermark);
317        context.removeCheckpointFlag();
318        log.debug(metadata.name() + ": checkpoint");
319        setThreadName("checkpoint");
320    }
321
322    protected void saveTimers() {
323        // TODO: save timers in the key value store NXP-22112
324    }
325
326    protected void saveState() {
327        // TODO: save key value store NXP-22112
328    }
329
330    protected void saveOffsets() {
331        if (tailer != null) {
332            tailer.commit();
333        }
334    }
335
336    protected void sendRecords() {
337        for (String stream : metadata.outputStreams()) {
338            LogAppender<Record> appender = logManager.getAppender(stream);
339            for (Record record : context.getRecords(stream)) {
340                // System.out.println(metadata.name() + " send record to " + stream + " lowWatermark " + lowWatermark);
341                if (record.watermark == 0) {
342                    // use low watermark when not set
343                    record.watermark = lowWatermark.getLow().getValue();
344                }
345                appender.append(record.key, record);
346                outRecords++;
347            }
348            context.getRecords(stream).clear();
349        }
350    }
351
352    public Watermark getLowWatermark() {
353        return lowWatermark.getLow();
354    }
355
356    protected void setThreadName(String message) {
357        String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords
358                + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:"
359                + lowWatermark.getLow().getValue() + ",loop:" + counter;
360        if (message != null) {
361            name += "," + message;
362        }
363        Thread.currentThread().setName(name);
364    }
365
366    @Override
367    public void onPartitionsRevoked(Collection<LogPartition> partitions) {
368        setThreadName("rebalance revoked");
369    }
370
371    @Override
372    public void onPartitionsAssigned(Collection<LogPartition> partitions) {
373        lastReadTime = System.currentTimeMillis();
374        setThreadName("rebalance assigned");
375        // reset the context
376        this.context = new ComputationContextImpl(metadata);
377        log.debug(metadata.name() + ": Init");
378        computation.init(context);
379        lastReadTime = System.currentTimeMillis();
380        lastTimerExecution = 0;
381        assignmentLatch.countDown();
382        // what about watermark ?
383    }
384}