001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import java.time.Duration; 022import java.util.Collection; 023import java.util.LinkedHashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Supplier; 029import java.util.stream.Collectors; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.lib.stream.computation.Computation; 034import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 035import org.nuxeo.lib.stream.computation.Record; 036import org.nuxeo.lib.stream.computation.Watermark; 037import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl; 038import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval; 039import org.nuxeo.lib.stream.log.LogAppender; 040import org.nuxeo.lib.stream.log.LogManager; 041import org.nuxeo.lib.stream.log.LogPartition; 042import org.nuxeo.lib.stream.log.LogRecord; 043import org.nuxeo.lib.stream.log.LogTailer; 044import org.nuxeo.lib.stream.log.RebalanceException; 045import org.nuxeo.lib.stream.log.RebalanceListener; 046 047/** 048 * Thread driving a Computation 049 * 050 * @since 9.3 051 */ 052@SuppressWarnings("EmptyMethod") 053public class ComputationRunner implements Runnable, RebalanceListener { 054 public static final Duration READ_TIMEOUT = Duration.ofMillis(25); 055 056 protected static final long STARVING_TIMEOUT_MS = 1000; 057 058 private static final Log log = LogFactory.getLog(ComputationRunner.class); 059 060 protected final LogManager logManager; 061 062 protected final ComputationMetadataMapping metadata; 063 064 protected final LogTailer<Record> tailer; 065 066 protected final Supplier<Computation> supplier; 067 068 protected final CountDownLatch assignmentLatch = new CountDownLatch(1); 069 070 protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval(); 071 072 protected ComputationContextImpl context; 073 074 protected volatile boolean stop; 075 076 protected volatile boolean drain; 077 078 protected Computation computation; 079 080 protected long counter; 081 082 protected long inRecords; 083 084 protected long inCheckpointRecords; 085 086 protected long outRecords; 087 088 protected long lastReadTime = System.currentTimeMillis(); 089 090 protected long lastTimerExecution; 091 092 protected String threadName; 093 094 @SuppressWarnings("unchecked") 095 public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 096 List<LogPartition> defaultAssignment, LogManager logManager) { 097 this.supplier = supplier; 098 this.metadata = metadata; 099 this.logManager = logManager; 100 this.context = new ComputationContextImpl(metadata); 101 if (metadata.inputStreams().isEmpty()) { 102 this.tailer = null; 103 assignmentLatch.countDown(); 104 } else if (logManager.supportSubscribe()) { 105 this.tailer = logManager.subscribe(metadata.name(), metadata.inputStreams(), this); 106 } else { 107 this.tailer = logManager.createTailer(metadata.name(), defaultAssignment); 108 assignmentLatch.countDown(); 109 } 110 } 111 112 public void stop() { 113 log.debug(metadata.name() + ": Receives Stop signal"); 114 stop = true; 115 } 116 117 public void drain() { 118 log.debug(metadata.name() + ": Receives Drain signal"); 119 drain = true; 120 } 121 122 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 123 if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 124 log.warn(metadata.name() + ": Timeout waiting for assignment"); 125 return false; 126 } 127 return true; 128 } 129 130 @Override 131 public void run() { 132 threadName = Thread.currentThread().getName(); 133 boolean interrupted = false; 134 computation = supplier.get(); 135 log.debug(metadata.name() + ": Init"); 136 computation.init(context); 137 log.debug(metadata.name() + ": Start"); 138 try { 139 processLoop(); 140 } catch (InterruptedException e) { 141 // this is expected when the pool is shutdownNow 142 if (log.isTraceEnabled()) { 143 log.debug(metadata.name() + ": Interrupted", e); 144 } else { 145 log.debug(metadata.name() + ": Interrupted"); 146 } 147 // the interrupt flag is set after the tailer are closed 148 interrupted = true; 149 } catch (Exception e) { 150 if (Thread.currentThread().isInterrupted()) { 151 // this can happen when pool is shutdownNow throwing ClosedByInterruptException 152 log.info(metadata.name() + ": Interrupted", e); 153 // TODO: test if we should set interrupted = true to rearm the interrupt flag 154 } else { 155 log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e); 156 throw e; 157 } 158 } finally { 159 try { 160 computation.destroy(); 161 closeTailer(); 162 log.debug(metadata.name() + ": Exited"); 163 } finally { 164 if (interrupted) { 165 Thread.currentThread().interrupt(); 166 } 167 } 168 } 169 } 170 171 protected void closeTailer() { 172 if (tailer != null && !tailer.closed()) { 173 tailer.close(); 174 } 175 } 176 177 protected void processLoop() throws InterruptedException { 178 while (continueLoop()) { 179 processTimer(); 180 processRecord(); 181 counter++; 182 // TODO: add pause for computation without inputs or without timer to prevent CPU hogs 183 } 184 } 185 186 protected boolean continueLoop() { 187 if (stop || Thread.currentThread().isInterrupted()) { 188 return false; 189 } else if (drain) { 190 long now = System.currentTimeMillis(); 191 // for a source we take lastTimerExecution starvation 192 if (metadata.inputStreams().isEmpty()) { 193 if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) { 194 log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago"); 195 return false; 196 } 197 } else { 198 if ((now - lastReadTime) > STARVING_TIMEOUT_MS) { 199 log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, " 200 + inRecords + " records read, " + counter + " reads attempt"); 201 return false; 202 } 203 } 204 } 205 return true; 206 } 207 208 protected void processTimer() { 209 Map<String, Long> timers = context.getTimers(); 210 if (timers.isEmpty()) { 211 return; 212 } 213 long now = System.currentTimeMillis(); 214 final boolean[] timerUpdate = { false }; 215 // filter and order timers 216 LinkedHashMap<String, Long> sortedTimer = timers.entrySet() 217 .stream() 218 .filter(entry -> entry.getValue() <= now) 219 .sorted(Map.Entry.comparingByValue()) 220 .collect( 221 Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 222 (e1, e2) -> e1, LinkedHashMap::new)); 223 sortedTimer.forEach((key, value) -> { 224 context.removeTimer(key); 225 computation.processTimer(context, key, value); 226 timerUpdate[0] = true; 227 }); 228 if (timerUpdate[0]) { 229 checkSourceLowWatermark(); 230 lastTimerExecution = now; 231 setThreadName("timer"); 232 checkpointIfNecessary(); 233 } 234 235 } 236 237 protected void processRecord() throws InterruptedException { 238 if (tailer == null) { 239 return; 240 } 241 Duration timeoutRead = getTimeoutDuration(); 242 LogRecord<Record> logRecord = null; 243 try { 244 logRecord = tailer.read(timeoutRead); 245 } catch (RebalanceException e) { 246 // the revoke has done a checkpoint we can continue 247 } 248 Record record; 249 if (logRecord != null) { 250 record = logRecord.message(); 251 lastReadTime = System.currentTimeMillis(); 252 inRecords++; 253 lowWatermark.mark(record.watermark); 254 String from = metadata.reverseMap(logRecord.offset().partition().name()); 255 // System.out.println(metadata.name() + ": Receive from " + from + " record: " + record); 256 computation.processRecord(context, from, record); 257 checkRecordFlags(record); 258 checkSourceLowWatermark(); 259 setThreadName("record"); 260 checkpointIfNecessary(); 261 } 262 } 263 264 protected Duration getTimeoutDuration() { 265 // Adapt the duration so we are not throttling when one of the input stream is empty 266 return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime)); 267 } 268 269 protected void checkSourceLowWatermark() { 270 long watermark = context.getSourceLowWatermark(); 271 if (watermark > 0) { 272 lowWatermark.mark(Watermark.ofValue(watermark)); 273 // System.out.println(metadata.name() + ": Set source wm " + lowWatermark); 274 context.setSourceLowWatermark(0); 275 } 276 } 277 278 protected void checkRecordFlags(Record record) { 279 if (record.flags.contains(Record.Flag.POISON_PILL)) { 280 log.info(metadata.name() + ": Receive POISON PILL"); 281 context.askForCheckpoint(); 282 stop = true; 283 } else if (record.flags.contains(Record.Flag.COMMIT)) { 284 context.askForCheckpoint(); 285 } 286 } 287 288 protected void checkpointIfNecessary() { 289 if (context.requireCheckpoint()) { 290 boolean completed = false; 291 try { 292 checkpoint(); 293 completed = true; 294 inCheckpointRecords = inRecords; 295 } finally { 296 if (!completed) { 297 log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates."); 298 } 299 } 300 } 301 } 302 303 protected void checkpoint() { 304 sendRecords(); 305 saveTimers(); 306 saveState(); 307 // Simulate slow checkpoint 308 // try { 309 // Thread.sleep(1); 310 // } catch (InterruptedException e) { 311 // Thread.currentThread().interrupt(); 312 // throw e; 313 // } 314 saveOffsets(); 315 lowWatermark.checkpoint(); 316 // System.out.println("checkpoint " + metadata.name() + " " + lowWatermark); 317 context.removeCheckpointFlag(); 318 log.debug(metadata.name() + ": checkpoint"); 319 setThreadName("checkpoint"); 320 } 321 322 protected void saveTimers() { 323 // TODO: save timers in the key value store NXP-22112 324 } 325 326 protected void saveState() { 327 // TODO: save key value store NXP-22112 328 } 329 330 protected void saveOffsets() { 331 if (tailer != null) { 332 tailer.commit(); 333 } 334 } 335 336 protected void sendRecords() { 337 for (String stream : metadata.outputStreams()) { 338 LogAppender<Record> appender = logManager.getAppender(stream); 339 for (Record record : context.getRecords(stream)) { 340 // System.out.println(metadata.name() + " send record to " + stream + " lowWatermark " + lowWatermark); 341 if (record.watermark == 0) { 342 // use low watermark when not set 343 record.watermark = lowWatermark.getLow().getValue(); 344 } 345 appender.append(record.key, record); 346 outRecords++; 347 } 348 context.getRecords(stream).clear(); 349 } 350 } 351 352 public Watermark getLowWatermark() { 353 return lowWatermark.getLow(); 354 } 355 356 protected void setThreadName(String message) { 357 String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords 358 + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:" 359 + lowWatermark.getLow().getValue() + ",loop:" + counter; 360 if (message != null) { 361 name += "," + message; 362 } 363 Thread.currentThread().setName(name); 364 } 365 366 @Override 367 public void onPartitionsRevoked(Collection<LogPartition> partitions) { 368 setThreadName("rebalance revoked"); 369 } 370 371 @Override 372 public void onPartitionsAssigned(Collection<LogPartition> partitions) { 373 lastReadTime = System.currentTimeMillis(); 374 setThreadName("rebalance assigned"); 375 // reset the context 376 this.context = new ComputationContextImpl(metadata); 377 log.debug(metadata.name() + ": Init"); 378 computation.init(context); 379 lastReadTime = System.currentTimeMillis(); 380 lastTimerExecution = 0; 381 assignmentLatch.countDown(); 382 // what about watermark ? 383 } 384}