001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.computation.Computation;
024import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationMetadataMapping;
025import org.nuxeo.ecm.platform.importer.mqueues.computation.Record;
026import org.nuxeo.ecm.platform.importer.mqueues.computation.Watermark;
027import org.nuxeo.ecm.platform.importer.mqueues.computation.internals.ComputationContextImpl;
028import org.nuxeo.ecm.platform.importer.mqueues.computation.internals.WatermarkMonotonicInterval;
029import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
030import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
031import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
032import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceException;
033import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener;
034import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord;
035import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
036
037import java.time.Duration;
038import java.util.Collection;
039import java.util.LinkedHashMap;
040import java.util.List;
041import java.util.Map;
042import java.util.concurrent.CountDownLatch;
043import java.util.concurrent.TimeUnit;
044import java.util.function.Supplier;
045import java.util.stream.Collectors;
046
047/**
048 * Thread driving a Computation
049 *
050 * @since 9.2
051 */
052public class MQComputationRunner implements Runnable, MQRebalanceListener {
053    private static final Log log = LogFactory.getLog(MQComputationRunner.class);
054    private static final long STARVING_TIMEOUT_MS = 1000;
055    public static final Duration READ_TIMEOUT = Duration.ofMillis(25);
056
057    private ComputationContextImpl context;
058    private final MQManager<Record> mqManager;
059    private final ComputationMetadataMapping metadata;
060    private final MQTailer<Record> tailer;
061    private final Supplier<Computation> supplier;
062
063    private volatile boolean stop = false;
064    private volatile boolean drain = false;
065    private CountDownLatch assignmentLatch = new CountDownLatch(1);
066
067    private Computation computation;
068    private long counter = 0;
069    private long inRecords = 0;
070    private long inCheckpointRecords = 0;
071    private long outRecords = 0;
072    private final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
073    private long lastReadTime = System.currentTimeMillis();
074    private long lastTimerExecution = 0;
075    private String threadName;
076    private boolean needContextInitialize = false;
077
078    @SuppressWarnings("unchecked")
079    public MQComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
080                               List<MQPartition> defaultAssignment, MQManager<Record> mqManager) {
081        this.supplier = supplier;
082        this.metadata = metadata;
083        this.mqManager = mqManager;
084        this.context = new ComputationContextImpl(metadata);
085        if (metadata.inputStreams().isEmpty()) {
086            this.tailer = null;
087            assignmentLatch.countDown();
088        } else if (mqManager.supportSubscribe()) {
089            this.tailer = mqManager.subscribe(metadata.name(), metadata.inputStreams(), this);
090        } else {
091            this.tailer = mqManager.createTailer(metadata.name(), defaultAssignment);
092            assignmentLatch.countDown();
093        }
094    }
095
096    public void stop() {
097        log.debug(metadata.name() + ": Receives Stop signal");
098        stop = true;
099    }
100
101    public void drain() {
102        log.debug(metadata.name() + ": Receives Drain signal");
103        drain = true;
104    }
105
106    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
107        if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
108            log.warn(metadata.name() + ": Timeout waiting for assignment");
109            return false;
110        }
111        return true;
112    }
113
114    @Override
115    public void run() {
116        threadName = Thread.currentThread().getName();
117        boolean interrupted = false;
118        computation = supplier.get();
119        log.debug(metadata.name() + ": Init");
120        computation.init(context);
121        log.debug(metadata.name() + ": Start");
122        try {
123            processLoop();
124        } catch (InterruptedException e) {
125            // this is expected when the pool is shutdownNow
126            if (log.isTraceEnabled()) {
127                log.debug(metadata.name() + ": Interrupted", e);
128            } else {
129                log.debug(metadata.name() + ": Interrupted");
130            }
131            // the interrupt flag is set after the tailer are closed
132            interrupted = true;
133        } catch (Exception e) {
134            if (Thread.currentThread().isInterrupted()) {
135                // this can happen when pool is shutdownNow throwing ClosedByInterruptException
136                log.info(metadata.name() + ": Interrupted", e);
137            } else {
138                log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e);
139                throw e;
140            }
141        } finally {
142            try {
143                computation.destroy();
144                closeTailer();
145                log.debug(metadata.name() + ": Exited");
146            } finally {
147                if (interrupted) {
148                    Thread.currentThread().interrupt();
149                }
150            }
151        }
152    }
153
154    private void closeTailer() {
155        if (tailer != null && !tailer.closed()) {
156            try {
157                tailer.close();
158            } catch (Exception e) {
159                log.debug(metadata.name() + ": Exception while closing tailer", e);
160            }
161        }
162    }
163
164    private void processLoop() throws InterruptedException {
165        while (continueLoop()) {
166            processTimer();
167            processRecord();
168            counter++;
169            // TODO: add pause for computation without inputs or without timer to prevent CPU hogs
170        }
171    }
172
173    private boolean continueLoop() {
174        if (stop || Thread.currentThread().isInterrupted()) {
175            return false;
176        } else if (drain) {
177            long now = System.currentTimeMillis();
178            // for a source we take lastTimerExecution starvation
179            if (metadata.inputStreams().isEmpty()) {
180                if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) {
181                    log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago");
182                    return false;
183                }
184            } else {
185                if ((now - lastReadTime) > STARVING_TIMEOUT_MS) {
186                    log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) +
187                            " ms, " + inRecords + " records read, " + counter + " reads attempt");
188                    return false;
189                }
190            }
191        }
192        return true;
193    }
194
195    private void processTimer() throws InterruptedException {
196        Map<String, Long> timers = context.getTimers();
197        if (timers.isEmpty()) {
198            return;
199        }
200        long now = System.currentTimeMillis();
201        final boolean[] timerUpdate = {false};
202        // filter and order timers
203        LinkedHashMap<String, Long> sortedTimer = timers.entrySet().stream()
204                .filter(entry -> entry.getValue() <= now)
205                .sorted(Map.Entry.comparingByValue())
206                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
207        sortedTimer.forEach((key, value) -> {
208            context.removeTimer(key);
209            computation.processTimer(context, key, value);
210            timerUpdate[0] = true;
211        });
212        if (timerUpdate[0]) {
213            checkSourceLowWatermark();
214            lastTimerExecution = now;
215            setThreadName("timer");
216            checkpointIfNecessary();
217        }
218
219    }
220
221    private void processRecord() throws InterruptedException {
222        if (tailer == null) {
223            return;
224        }
225        Duration timeoutRead = getTimeoutDuration();
226        MQRecord<Record> mqRecord = null;
227        try {
228            mqRecord = tailer.read(timeoutRead);
229        } catch (MQRebalanceException e) {
230            // the revoke has done a checkpoint we can continue
231        }
232        Record record;
233        if (mqRecord != null) {
234            record = mqRecord.message();
235            lastReadTime = System.currentTimeMillis();
236            inRecords++;
237            lowWatermark.mark(record.watermark);
238            String from = metadata.reverseMap(mqRecord.partition().name());
239            // System.out.println(metadata.name() + ": Receive from " + from + " record: " + record);
240            computation.processRecord(context, from, record);
241            checkRecordFlags(record);
242            checkSourceLowWatermark();
243            setThreadName("record");
244            checkpointIfNecessary();
245        }
246    }
247
248    private Duration getTimeoutDuration() {
249        // Adapt the duration so we are not throttling when one of the input stream is empty
250        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime));
251    }
252
253    private void checkSourceLowWatermark() {
254        long watermark = context.getSourceLowWatermark();
255        if (watermark > 0) {
256            lowWatermark.mark(Watermark.ofValue(watermark));
257            // System.out.println(metadata.name() + ": Set source wm " + lowWatermark);
258            context.setSourceLowWatermark(0);
259        }
260    }
261
262    private void checkRecordFlags(Record record) {
263        if (record.flags.contains(Record.Flag.POISON_PILL)) {
264            log.info(metadata.name() + ": Receive POISON PILL");
265            context.askForCheckpoint();
266            stop = true;
267        } else if (record.flags.contains(Record.Flag.COMMIT)) {
268            context.askForCheckpoint();
269        }
270    }
271
272    private void checkpointIfNecessary() throws InterruptedException {
273        if (context.requireCheckpoint()) {
274            boolean completed = false;
275            try {
276                checkpoint();
277                completed = true;
278                inCheckpointRecords = inRecords;
279            } finally {
280                if (!completed) {
281                    log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
282                }
283            }
284        }
285    }
286
287    private void checkpoint() throws InterruptedException {
288        sendRecords();
289        saveTimers();
290        saveState();
291        // Simulate slow checkpoint
292//        try {
293//            Thread.sleep(1);
294//        } catch (InterruptedException e) {
295//            Thread.currentThread().interrupt();
296//            throw e;
297//        }
298        saveOffsets();
299        lowWatermark.checkpoint();
300        // System.out.println("checkpoint " + metadata.name() + " " + lowWatermark);
301        context.removeCheckpointFlag();
302        log.debug(metadata.name() + ": checkpoint");
303        setThreadName("checkpoint");
304    }
305
306    private void saveTimers() {
307        // TODO save timers in the key value store
308    }
309
310    private void saveState() {
311        // TODO save key value store
312    }
313
314    private void saveOffsets() {
315        if (tailer != null) {
316            tailer.commit();
317        }
318    }
319
320    private void sendRecords() {
321        for (String ostream : metadata.outputStreams()) {
322            MQAppender<Record> appender = mqManager.getAppender(ostream);
323            for (Record record : context.getRecords(ostream)) {
324                // System.out.println(metadata.name() + " send record to " + ostream + " lowwm " + lowWatermark);
325                if (record.watermark == 0) {
326                    // use low watermark when not set
327                    record.watermark = lowWatermark.getLow().getValue();
328                }
329                appender.append(record.key, record);
330                outRecords++;
331            }
332            context.getRecords(ostream).clear();
333        }
334    }
335
336    public Watermark getLowWatermark() {
337        return lowWatermark.getLow();
338    }
339
340    private void setThreadName(String message) {
341        String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords +
342                ",lastRead:" + lastReadTime +
343                ",lastTimer:" + lastTimerExecution + ",wm:" + lowWatermark.getLow().getValue() +
344                ",loop:" + counter;
345        if (message != null) {
346            name += "," + message;
347        }
348        Thread.currentThread().setName(name);
349    }
350
351    @Override
352    public void onPartitionsRevoked(Collection<MQPartition> partitions) {
353        setThreadName("rebalance revoked");
354    }
355
356    @Override
357    public void onPartitionsAssigned(Collection<MQPartition> partitions) {
358        lastReadTime = System.currentTimeMillis();
359        setThreadName("rebalance assigned");
360        // reset the context
361        this.context = new ComputationContextImpl(metadata);
362        log.debug(metadata.name() + ": Init");
363        computation.init(context);
364        lastReadTime = System.currentTimeMillis();
365        lastTimerExecution = 0;
366        assignmentLatch.countDown();
367        // what about watermark ?
368    }
369}