001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import java.time.Duration; 022import java.util.Collection; 023import java.util.LinkedHashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Supplier; 029import java.util.stream.Collectors; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.lib.stream.codec.Codec; 034import org.nuxeo.lib.stream.computation.Computation; 035import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 036import org.nuxeo.lib.stream.computation.ComputationPolicy; 037import org.nuxeo.lib.stream.computation.Record; 038import org.nuxeo.lib.stream.computation.Watermark; 039import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl; 040import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval; 041import org.nuxeo.lib.stream.log.LogAppender; 042import org.nuxeo.lib.stream.log.LogManager; 043import org.nuxeo.lib.stream.log.LogPartition; 044import org.nuxeo.lib.stream.log.LogRecord; 045import org.nuxeo.lib.stream.log.LogTailer; 046import org.nuxeo.lib.stream.log.RebalanceException; 047import org.nuxeo.lib.stream.log.RebalanceListener; 048 049import net.jodah.failsafe.Failsafe; 050 051/** 052 * Thread driving a Computation 053 * 054 * @since 9.3 055 */ 056@SuppressWarnings("EmptyMethod") 057public class ComputationRunner implements Runnable, RebalanceListener { 058 public static final Duration READ_TIMEOUT = Duration.ofMillis(25); 059 060 protected static final long STARVING_TIMEOUT_MS = 1000; 061 062 protected static final long INACTIVITY_BREAK_MS = 100; 063 064 private static final Log log = LogFactory.getLog(ComputationRunner.class); 065 066 protected final LogManager logManager; 067 068 protected final ComputationMetadataMapping metadata; 069 070 protected final LogTailer<Record> tailer; 071 072 protected final Supplier<Computation> supplier; 073 074 protected final CountDownLatch assignmentLatch = new CountDownLatch(1); 075 076 protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval(); 077 078 protected final Codec<Record> inputCodec; 079 080 protected final Codec<Record> outputCodec; 081 082 protected final ComputationPolicy policy; 083 084 protected ComputationContextImpl context; 085 086 protected volatile boolean stop; 087 088 protected volatile boolean drain; 089 090 protected Computation computation; 091 092 protected long counter; 093 094 protected long inRecords; 095 096 protected long inCheckpointRecords; 097 098 protected long outRecords; 099 100 protected long lastReadTime = System.currentTimeMillis(); 101 102 protected long lastTimerExecution; 103 104 protected String threadName; 105 106 @SuppressWarnings("unchecked") 107 public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 108 List<LogPartition> defaultAssignment, LogManager logManager, Codec<Record> inputCodec, 109 Codec<Record> outputCodec, ComputationPolicy policy) { 110 this.supplier = supplier; 111 this.metadata = metadata; 112 this.logManager = logManager; 113 this.context = new ComputationContextImpl(logManager, metadata, policy); 114 this.inputCodec = inputCodec; 115 this.outputCodec = outputCodec; 116 this.policy = policy; 117 if (metadata.inputStreams().isEmpty()) { 118 this.tailer = null; 119 assignmentLatch.countDown(); 120 } else if (logManager.supportSubscribe()) { 121 this.tailer = logManager.subscribe(metadata.name(), metadata.inputStreams(), this, inputCodec); 122 } else { 123 this.tailer = logManager.createTailer(metadata.name(), defaultAssignment, inputCodec); 124 assignmentLatch.countDown(); 125 } 126 } 127 128 public void stop() { 129 log.debug(metadata.name() + ": Receives Stop signal"); 130 stop = true; 131 if (computation != null) { 132 computation.signalStop(); 133 } 134 } 135 136 public void drain() { 137 log.debug(metadata.name() + ": Receives Drain signal"); 138 drain = true; 139 } 140 141 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 142 if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 143 log.warn(metadata.name() + ": Timeout waiting for assignment"); 144 return false; 145 } 146 return true; 147 } 148 149 @Override 150 public void run() { 151 threadName = Thread.currentThread().getName(); 152 boolean interrupted = false; 153 computation = supplier.get(); 154 log.debug(metadata.name() + ": Init"); 155 computation.init(context); 156 log.debug(metadata.name() + ": Start"); 157 try { 158 processLoop(); 159 } catch (InterruptedException e) { 160 interrupted = true; // Thread.currentThread().interrupt() in finally 161 // this is expected when the pool is shutdownNow 162 String msg = metadata.name() + ": Interrupted"; 163 if (log.isTraceEnabled()) { 164 log.debug(msg, e); 165 } else { 166 log.debug(msg); 167 } 168 } catch (Exception e) { 169 if (Thread.currentThread().isInterrupted()) { 170 // this can happen when pool is shutdownNow throwing ClosedByInterruptException 171 log.info(metadata.name() + ": Interrupted", e); 172 } else { 173 log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e); 174 throw e; 175 } 176 } finally { 177 try { 178 computation.destroy(); 179 closeTailer(); 180 log.debug(metadata.name() + ": Exited"); 181 } finally { 182 if (interrupted) { 183 Thread.currentThread().interrupt(); 184 } 185 } 186 } 187 } 188 189 protected void closeTailer() { 190 if (tailer != null && !tailer.closed()) { 191 tailer.close(); 192 } 193 } 194 195 protected void processLoop() throws InterruptedException { 196 boolean activity; 197 while (continueLoop()) { 198 activity = processTimer(); 199 activity |= processRecord(); 200 counter++; 201 if (!activity) { 202 // no timer nor record to process, take a break 203 Thread.sleep(INACTIVITY_BREAK_MS); 204 } 205 } 206 } 207 208 protected boolean continueLoop() { 209 if (stop || Thread.currentThread().isInterrupted()) { 210 return false; 211 } else if (drain) { 212 long now = System.currentTimeMillis(); 213 // for a source we take lastTimerExecution starvation 214 if (metadata.inputStreams().isEmpty()) { 215 if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) { 216 log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago"); 217 return false; 218 } 219 } else { 220 if ((now - lastReadTime) > STARVING_TIMEOUT_MS) { 221 log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, " 222 + inRecords + " records read, " + counter + " reads attempt"); 223 return false; 224 } 225 } 226 } 227 return true; 228 } 229 230 protected boolean processTimer() { 231 Map<String, Long> timers = context.getTimers(); 232 if (timers.isEmpty()) { 233 return false; 234 } 235 if (tailer != null && tailer.assignments().isEmpty()) { 236 // needed to ensure single source across multiple nodes 237 return false; 238 } 239 long now = System.currentTimeMillis(); 240 final boolean[] timerUpdate = { false }; 241 // filter and order timers 242 LinkedHashMap<String, Long> sortedTimer = timers.entrySet() 243 .stream() 244 .filter(entry -> entry.getValue() <= now) 245 .sorted(Map.Entry.comparingByValue()) 246 .collect( 247 Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 248 (e1, e2) -> e1, LinkedHashMap::new)); 249 sortedTimer.forEach((key, value) -> { 250 context.removeTimer(key); 251 processTimerWithRetry(key, value); 252 timerUpdate[0] = true; 253 }); 254 if (timerUpdate[0]) { 255 checkSourceLowWatermark(); 256 lastTimerExecution = now; 257 setThreadName("timer"); 258 checkpointIfNecessary(); 259 if (context.requireTerminate()) { 260 stop = true; 261 } 262 return true; 263 } 264 return false; 265 } 266 267 protected void processTimerWithRetry(String key, Long value) { 268 Failsafe.with(policy.getRetryPolicy()) 269 .onRetry(failure -> computation.processRetry(context, failure)) 270 .onFailure(failure -> computation.processFailure(context, failure)) 271 .withFallback(() -> processFallback(context)) 272 .run(() -> computation.processTimer(context, key, value)); 273 } 274 275 protected boolean processRecord() throws InterruptedException { 276 if (context.requireTerminate()) { 277 stop = true; 278 return true; 279 } 280 if (tailer == null) { 281 return false; 282 } 283 Duration timeoutRead = getTimeoutDuration(); 284 LogRecord<Record> logRecord = null; 285 try { 286 logRecord = tailer.read(timeoutRead); 287 } catch (RebalanceException e) { 288 // the revoke has done a checkpoint we can continue 289 } 290 Record record; 291 if (logRecord != null) { 292 record = logRecord.message(); 293 lastReadTime = System.currentTimeMillis(); 294 inRecords++; 295 lowWatermark.mark(record.getWatermark()); 296 String from = metadata.reverseMap(logRecord.offset().partition().name()); 297 context.setLastOffset(logRecord.offset()); 298 processRecordWithRetry(from, record); 299 checkRecordFlags(record); 300 checkSourceLowWatermark(); 301 setThreadName("record"); 302 checkpointIfNecessary(); 303 return true; 304 } 305 return false; 306 } 307 308 protected void processRecordWithRetry(String from, Record record) { 309 Failsafe.with(policy.getRetryPolicy()) 310 .onRetry(failure -> computation.processRetry(context, failure)) 311 .onFailure(failure -> computation.processFailure(context, failure)) 312 .withFallback(() -> processFallback(context)) 313 .run(() -> computation.processRecord(context, from, record)); 314 } 315 316 protected void processFallback(ComputationContextImpl context) { 317 if (policy.continueOnFailure()) { 318 log.error(String.format("Skip record after failure: %s", context.getLastOffset())); 319 context.askForCheckpoint(); 320 } else { 321 log.error(String.format("Terminate computation: %s due to previous failure", metadata.name())); 322 context.cancelAskForCheckpoint(); 323 context.askForTermination(); 324 } 325 } 326 327 protected Duration getTimeoutDuration() { 328 // Adapt the duration so we are not throttling when one of the input stream is empty 329 return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime)); 330 } 331 332 protected void checkSourceLowWatermark() { 333 long watermark = context.getSourceLowWatermark(); 334 if (watermark > 0) { 335 lowWatermark.mark(Watermark.ofValue(watermark)); 336 context.setSourceLowWatermark(0); 337 } 338 } 339 340 protected void checkRecordFlags(Record record) { 341 if (record.getFlags().contains(Record.Flag.POISON_PILL)) { 342 log.info(metadata.name() + ": Receive POISON PILL"); 343 context.askForCheckpoint(); 344 stop = true; 345 } else if (record.getFlags().contains(Record.Flag.COMMIT)) { 346 context.askForCheckpoint(); 347 } 348 } 349 350 protected void checkpointIfNecessary() { 351 if (context.requireCheckpoint()) { 352 boolean completed = false; 353 try { 354 checkpoint(); 355 completed = true; 356 } finally { 357 if (!completed) { 358 log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates."); 359 } 360 } 361 } 362 } 363 364 protected void checkpoint() { 365 sendRecords(); 366 saveTimers(); 367 saveState(); 368 // To Simulate slow checkpoint add a Thread.sleep(1) 369 saveOffsets(); 370 lowWatermark.checkpoint(); 371 context.removeCheckpointFlag(); 372 log.debug(metadata.name() + ": checkpoint"); 373 inCheckpointRecords = inRecords; 374 setThreadName("checkpoint"); 375 } 376 377 protected void saveTimers() { 378 // TODO: save timers in the key value store NXP-22112 379 } 380 381 protected void saveState() { 382 // TODO: save key value store NXP-22112 383 } 384 385 protected void saveOffsets() { 386 if (tailer != null) { 387 tailer.commit(); 388 } 389 } 390 391 protected void sendRecords() { 392 for (String stream : metadata.outputStreams()) { 393 LogAppender<Record> appender = logManager.getAppender(stream, outputCodec); 394 for (Record record : context.getRecords(stream)) { 395 if (record.getWatermark() == 0) { 396 // use low watermark when not set 397 record.setWatermark(lowWatermark.getLow().getValue()); 398 } 399 appender.append(record.getKey(), record); 400 outRecords++; 401 } 402 context.getRecords(stream).clear(); 403 } 404 } 405 406 public Watermark getLowWatermark() { 407 return lowWatermark.getLow(); 408 } 409 410 protected void setThreadName(String message) { 411 String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords 412 + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:" 413 + lowWatermark.getLow().getValue() + ",loop:" + counter; 414 if (message != null) { 415 name += "," + message; 416 } 417 Thread.currentThread().setName(name); 418 } 419 420 @Override 421 public void onPartitionsRevoked(Collection<LogPartition> partitions) { 422 setThreadName("rebalance revoked"); 423 } 424 425 @Override 426 public void onPartitionsAssigned(Collection<LogPartition> partitions) { 427 lastReadTime = System.currentTimeMillis(); 428 setThreadName("rebalance assigned"); 429 // reset the context 430 this.context = new ComputationContextImpl(logManager, metadata, policy); 431 log.debug(metadata.name() + ": Init"); 432 computation.init(context); 433 lastReadTime = System.currentTimeMillis(); 434 lastTimerExecution = 0; 435 assignmentLatch.countDown(); 436 // what about watermark ? 437 } 438}