001/* 002 * (C) Copyright 2017-2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import java.time.Duration; 022import java.time.Instant; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.LinkedHashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.function.Supplier; 032import java.util.stream.Collectors; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.lib.stream.computation.Computation; 037import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 038import org.nuxeo.lib.stream.computation.ComputationPolicy; 039import org.nuxeo.lib.stream.computation.Record; 040import org.nuxeo.lib.stream.computation.Watermark; 041import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl; 042import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval; 043import org.nuxeo.lib.stream.log.LogPartition; 044import org.nuxeo.lib.stream.log.LogRecord; 045import org.nuxeo.lib.stream.log.LogTailer; 046import org.nuxeo.lib.stream.log.Name; 047import org.nuxeo.lib.stream.log.RebalanceException; 048import org.nuxeo.lib.stream.log.RebalanceListener; 049 050import io.dropwizard.metrics5.Counter; 051import io.dropwizard.metrics5.MetricName; 052import io.dropwizard.metrics5.MetricRegistry; 053import io.dropwizard.metrics5.SharedMetricRegistries; 054import io.dropwizard.metrics5.Timer; 055import io.opencensus.common.Scope; 056import io.opencensus.trace.AttributeValue; 057import io.opencensus.trace.BlankSpan; 058import io.opencensus.trace.Link; 059import io.opencensus.trace.Span; 060import io.opencensus.trace.SpanContext; 061import io.opencensus.trace.Tracer; 062import io.opencensus.trace.Tracing; 063import io.opencensus.trace.propagation.BinaryFormat; 064import io.opencensus.trace.propagation.SpanContextParseException; 065 066import net.jodah.failsafe.Failsafe; 067 068/** 069 * Thread driving a Computation 070 * 071 * @since 9.3 072 */ 073@SuppressWarnings("EmptyMethod") 074public class ComputationRunner implements Runnable, RebalanceListener { 075 public static final Duration READ_TIMEOUT = Duration.ofMillis(25); 076 077 protected static final long STARVING_TIMEOUT_MS = 1000; 078 079 protected static final long INACTIVITY_BREAK_MS = 100; 080 081 private static final Log log = LogFactory.getLog(ComputationRunner.class); 082 083 protected final LogStreamManager streamManager; 084 085 protected final ComputationMetadataMapping metadata; 086 087 protected final LogTailer<Record> tailer; 088 089 protected final Supplier<Computation> supplier; 090 091 protected final CountDownLatch assignmentLatch = new CountDownLatch(1); 092 093 protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval(); 094 095 protected final ComputationPolicy policy; 096 097 protected ComputationContextImpl context; 098 099 protected volatile boolean stop; 100 101 protected volatile boolean drain; 102 103 protected Computation computation; 104 105 protected long counter; 106 107 protected long inRecords; 108 109 protected long inCheckpointRecords; 110 111 protected long outRecords; 112 113 protected long lastReadTime = System.currentTimeMillis(); 114 115 protected long lastTimerExecution; 116 117 protected String threadName; 118 119 protected List<LogPartition> defaultAssignment; 120 121 // @since 11.1 122 // Use the Nuxeo registry name without adding dependency on nuxeo-runtime 123 public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService"; 124 125 public static final String GLOBAL_FAILURE_COUNT_REGISTRY_NAME = MetricRegistry.name("nuxeo", "streams", "failure") 126 .getKey(); 127 128 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME); 129 130 protected Counter globalFailureCount; 131 132 protected Counter failureCount; 133 134 protected Counter recordSkippedCount; 135 136 protected Counter runningCount; 137 138 protected Timer processRecordTimer; 139 140 protected Timer processTimerTimer; 141 142 // @since 11.1 143 protected static AtomicInteger skipFailures = new AtomicInteger(0); 144 145 // @since 11.1 146 protected boolean recordActivity; 147 148 protected SpanContext lastSpanContext; 149 150 @SuppressWarnings("unchecked") 151 public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 152 List<LogPartition> defaultAssignment, LogStreamManager streamManager, ComputationPolicy policy) { 153 this.supplier = supplier; 154 this.metadata = metadata; 155 this.streamManager = streamManager; 156 this.policy = policy; 157 if (metadata.inputStreams().isEmpty()) { 158 this.tailer = null; 159 this.context = new ComputationContextImpl(streamManager, metadata, policy, false); 160 assignmentLatch.countDown(); 161 } else if (streamManager.supportSubscribe()) { 162 this.tailer = streamManager.subscribe(Name.ofUrn(metadata.name()), 163 metadata.inputStreams().stream().map(Name::ofUrn).collect(Collectors.toList()), this); 164 // create a spare context until the assignment is done 165 this.context = new ComputationContextImpl(streamManager, metadata, policy, true); 166 } else { 167 this.context = new ComputationContextImpl(streamManager, metadata, policy, defaultAssignment.isEmpty()); 168 this.tailer = streamManager.createTailer(Name.ofUrn(metadata.name()), defaultAssignment); 169 assignmentLatch.countDown(); 170 } 171 this.defaultAssignment = defaultAssignment; 172 } 173 174 public void stop() { 175 log.debug(metadata.name() + ": Receives Stop signal"); 176 stop = true; 177 if (computation != null) { 178 computation.signalStop(); 179 } 180 } 181 182 public void drain() { 183 log.debug(metadata.name() + ": Receives Drain signal"); 184 drain = true; 185 } 186 187 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 188 if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 189 log.warn(metadata.name() + ": Timeout waiting for assignment"); 190 return false; 191 } 192 return true; 193 } 194 195 @Override 196 public void run() { 197 threadName = Thread.currentThread().getName(); 198 boolean interrupted = false; 199 boolean normalTermination = false; 200 computation = supplier.get(); 201 log.debug(metadata.name() + ": Init"); 202 registerMetrics(); 203 try { 204 computation.init(context); 205 log.debug(metadata.name() + ": Start"); 206 processLoop(); 207 normalTermination = true; 208 } catch (InterruptedException e) { 209 interrupted = true; // Thread.currentThread().interrupt() in finally 210 // this is expected when the pool is shutdownNow 211 String msg = metadata.name() + ": Interrupted"; 212 if (log.isTraceEnabled()) { 213 log.debug(msg, e); 214 } else { 215 log.debug(msg); 216 } 217 } catch (Exception e) { 218 if (Thread.interrupted()) { 219 // clearing the interrupt flag is wanted as closeTailer method needs an non-interrupted thread 220 interrupted = true; // Thread.currentThread().interrupt() in finally 221 // this can happen when pool is shutdownNow throwing ClosedByInterruptException 222 log.info(metadata.name() + ": Interrupted", e); 223 } else { 224 log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e); 225 throw e; 226 } 227 } finally { 228 try { 229 computation.destroy(); 230 closeTailer(); 231 } finally { 232 if (interrupted) { 233 Thread.currentThread().interrupt(); 234 } 235 } 236 if (normalTermination || interrupted) { 237 log.debug(metadata.name() + ": Terminated " + (interrupted ? "interrupted" : "normal")); 238 } else { 239 // Terminating because of unexpected error in the ComputationRunner code 240 log.error(String.format("Terminate computation: %s due to previous failure", metadata.name())); 241 globalFailureCount.inc(); 242 failureCount.inc(); 243 } 244 } 245 } 246 247 protected void registerMetrics() { 248 globalFailureCount = registry.counter(GLOBAL_FAILURE_COUNT_REGISTRY_NAME); 249 String name = Name.ofUrn(metadata.name()).getId(); 250 runningCount = registry.counter( 251 MetricName.build("nuxeo.streams.computation.running").tagged("computation", name)); 252 failureCount = registry.counter( 253 MetricName.build("nuxeo.streams.computation.failure").tagged("computation", name)); 254 recordSkippedCount = registry.counter( 255 MetricName.build("nuxeo.streams.computation.skippedRecord").tagged("computation", name)); 256 processRecordTimer = registry.timer( 257 MetricName.build("nuxeo.streams.computation.processRecord").tagged("computation", name)); 258 processTimerTimer = registry.timer( 259 MetricName.build("nuxeo.streams.computation.processTimer").tagged("computation", name)); 260 } 261 262 protected void closeTailer() { 263 if (tailer != null && !tailer.closed()) { 264 tailer.close(); 265 } 266 } 267 268 protected void processLoop() throws InterruptedException { 269 boolean timerActivity; 270 while (continueLoop()) { 271 timerActivity = processTimer(); 272 recordActivity = processRecord(); 273 counter++; 274 if (!timerActivity && !recordActivity) { 275 // no activity take a break 276 Thread.sleep(INACTIVITY_BREAK_MS); 277 } 278 } 279 } 280 281 protected boolean continueLoop() { 282 if (stop || Thread.currentThread().isInterrupted()) { 283 log.debug(metadata.name() + ": Stop processing " + (stop ? "stop required" : " interrupted")); 284 return false; 285 } else if (drain) { 286 long now = System.currentTimeMillis(); 287 if (metadata.inputStreams().isEmpty()) { 288 // for a source we take lastTimerExecution starvation 289 if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) { 290 log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago"); 291 return false; 292 } 293 } else if (!recordActivity && (now - lastReadTime) > STARVING_TIMEOUT_MS) { 294 log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, " 295 + inRecords + " records read, " + counter + " reads attempt"); 296 return false; 297 } 298 } 299 return true; 300 } 301 302 protected boolean processTimer() { 303 Map<String, Long> timers = context.getTimers(); 304 if (timers.isEmpty()) { 305 return false; 306 } 307 if (tailer != null && tailer.assignments().isEmpty()) { 308 // needed to ensure single source across multiple nodes 309 return false; 310 } 311 long now = System.currentTimeMillis(); 312 // filter and order timers 313 LinkedHashMap<String, Long> sortedTimer = timers.entrySet() 314 .stream() 315 .filter(entry -> entry.getValue() <= now) 316 .sorted(Map.Entry.comparingByValue()) 317 .collect( 318 Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 319 (e1, e2) -> e1, LinkedHashMap::new)); 320 if (sortedTimer.isEmpty()) { 321 return false; 322 } 323 return processTimerWithTracing(now, sortedTimer); 324 } 325 326 protected boolean processTimerWithTracing(long now, LinkedHashMap<String, Long> sortedTimer) { 327 Tracer tracer = Tracing.getTracer(); 328 Span span; 329 if (lastSpanContext != null) { 330 span = tracer.spanBuilderWithRemoteParent("comp/" + computation.metadata().name() + "/timer", 331 lastSpanContext).startSpan(); 332 span.addLink(Link.fromSpanContext(lastSpanContext, Link.Type.PARENT_LINKED_SPAN)); 333 HashMap<String, AttributeValue> map = new HashMap<>(); 334 map.put("comp.name", AttributeValue.stringAttributeValue(computation.metadata().name())); 335 map.put("comp.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName())); 336 map.put("record.last_offset", AttributeValue.stringAttributeValue(context.getLastOffset().toString())); 337 span.putAttributes(map); 338 lastSpanContext = null; 339 } else { 340 span = BlankSpan.INSTANCE; 341 } 342 try (Scope scope = Tracing.getTracer().withSpan(span)) { 343 final boolean[] timerUpdate = { false }; 344 sortedTimer.forEach((key, value) -> { 345 context.removeTimer(key); 346 processTimerWithRetry(key, value); 347 timerUpdate[0] = true; 348 }); 349 if (timerUpdate[0]) { 350 checkSourceLowWatermark(); 351 lastTimerExecution = now; 352 setThreadName("timer"); 353 checkpointIfNecessary(); 354 if (context.requireTerminate()) { 355 stop = true; 356 } 357 return true; 358 } 359 return false; 360 } finally { 361 span.end(); 362 } 363 } 364 365 protected void processTimerWithRetry(String key, Long value) { 366 try (Timer.Context ignored = processTimerTimer.time()) { 367 Failsafe.with(policy.getRetryPolicy()) 368 .onRetry(failure -> computation.processRetry(context, failure)) 369 .onFailure(failure -> computation.processFailure(context, failure)) 370 .withFallback(() -> processFallback(context)) 371 .run(() -> computation.processTimer(context, key, value)); 372 } 373 } 374 375 protected boolean processRecord() throws InterruptedException { 376 if (context.requireTerminate()) { 377 stop = true; 378 return true; 379 } 380 if (tailer == null) { 381 return false; 382 } 383 Duration timeoutRead = getTimeoutDuration(); 384 LogRecord<Record> logRecord = null; 385 try { 386 logRecord = tailer.read(timeoutRead); 387 } catch (RebalanceException e) { 388 // the revoke has done a checkpoint we can continue 389 } 390 Record record; 391 if (logRecord != null) { 392 record = logRecord.message(); 393 Name stream = logRecord.offset().partition().name(); 394 Record filteredRecord = streamManager.getFilter(stream).afterRead(record, logRecord.offset()); 395 if (filteredRecord == null) { 396 if (log.isDebugEnabled()) { 397 log.debug("Filtering skip record: " + record); 398 } 399 return false; 400 } else if (filteredRecord != record) { 401 logRecord = new LogRecord<>(filteredRecord, logRecord.offset()); 402 record = filteredRecord; 403 } 404 lastReadTime = System.currentTimeMillis(); 405 inRecords++; 406 lowWatermark.mark(record.getWatermark()); 407 context.setLastOffset(logRecord.offset()); 408 String from = metadata.reverseMap(stream.getUrn()); 409 processRecordWithTracing(from, record); 410 return true; 411 } 412 return false; 413 } 414 415 protected void processRecordWithTracing(String from, Record record) { 416 Span span = getSpanFromRecord(record); 417 try (Scope scope = Tracing.getTracer().withSpan(span)) { 418 processRecordWithRetry(from, record); 419 checkRecordFlags(record); 420 checkSourceLowWatermark(); 421 setThreadName("record"); 422 checkpointIfNecessary(); 423 } finally { 424 span.end(); 425 } 426 } 427 428 protected Span getSpanFromRecord(Record record) { 429 byte[] traceContext = record.getTraceContext(); 430 if (traceContext == null || traceContext.length == 0) { 431 return BlankSpan.INSTANCE; 432 } 433 Tracer tracer = Tracing.getTracer(); 434 BinaryFormat binaryFormat = Tracing.getPropagationComponent().getBinaryFormat(); 435 try { 436 // Build a span that has a follows from relationship with the parent span to denote an async processing 437 lastSpanContext = binaryFormat.fromByteArray(traceContext); 438 Span span = tracer.spanBuilderWithRemoteParent("comp/" + computation.metadata().name() + "/record", lastSpanContext) 439 .startSpan(); 440 span.addLink(Link.fromSpanContext(lastSpanContext, Link.Type.PARENT_LINKED_SPAN)); 441 442 HashMap<String, AttributeValue> map = new HashMap<>(); 443 map.put("comp.name", AttributeValue.stringAttributeValue(computation.metadata().name())); 444 map.put("comp.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName())); 445 map.put("record.key", AttributeValue.stringAttributeValue(record.getKey())); 446 map.put("record.offset", AttributeValue.stringAttributeValue(context.getLastOffset().toString())); 447 map.put("record.watermark", 448 AttributeValue.stringAttributeValue(Watermark.ofValue(record.getWatermark()).toString())); 449 map.put("record.submit_thread", AttributeValue.stringAttributeValue(record.getAppenderThread())); 450 map.put("record.data.length", AttributeValue.longAttributeValue(record.getData().length)); 451 span.putAttributes(map); 452 return span; 453 } catch (SpanContextParseException e) { 454 log.warn("Invalid span context in record: " + record.getKey() + " length: " + traceContext.length); 455 return BlankSpan.INSTANCE; 456 } 457 } 458 459 protected void processRecordWithRetry(String from, Record record) { 460 runningCount.inc(); 461 try (Timer.Context ignored = processRecordTimer.time()) { 462 Failsafe.with(policy.getRetryPolicy()) 463 .onRetry(failure -> computation.processRetry(context, failure)) 464 .onFailure(failure -> computation.processFailure(context, failure)) 465 .withFallback(() -> processFallback(context)) 466 .run(() -> computation.processRecord(context, from, record)); 467 } finally { 468 runningCount.dec(); 469 } 470 } 471 472 protected void processFallback(ComputationContextImpl context) { 473 if (policy.continueOnFailure()) { 474 log.error(String.format("Skip record after failure: %s", context.getLastOffset())); 475 context.askForCheckpoint(); 476 recordSkippedCount.inc(); 477 } else if (skipFailureForRecovery()) { 478 log.error(String.format("Skip record after failure instead of terminating because of recovery mode: %s", 479 context.getLastOffset())); 480 context.askForCheckpoint(); 481 recordSkippedCount.inc(); 482 } else { 483 log.error(String.format("Terminate computation: %s due to previous failure", metadata.name())); 484 context.cancelAskForCheckpoint(); 485 context.askForTermination(); 486 globalFailureCount.inc(); 487 failureCount.inc(); 488 } 489 } 490 491 protected boolean skipFailureForRecovery() { 492 if (policy.getSkipFirstFailures() > 0) { 493 if (skipFailures.incrementAndGet() <= policy.getSkipFirstFailures()) { 494 return true; 495 } 496 } 497 return false; 498 } 499 500 protected Duration getTimeoutDuration() { 501 // lastReadTime could have been updated by another thread calling onPartitionsAssigned when doing minus 502 // no need to synchronize it, we don't want an accurate value there 503 long adaptedReadTimeout = Math.max(0, System.currentTimeMillis() - lastReadTime); 504 // Adapt the duration so we are not throttling when one of the input stream is empty 505 return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), adaptedReadTimeout)); 506 } 507 508 protected void checkSourceLowWatermark() { 509 long watermark = context.getSourceLowWatermark(); 510 if (watermark > 0) { 511 lowWatermark.mark(Watermark.ofValue(watermark)); 512 context.setSourceLowWatermark(0); 513 } 514 } 515 516 protected void checkRecordFlags(Record record) { 517 if (record.getFlags().contains(Record.Flag.POISON_PILL)) { 518 log.info(metadata.name() + ": Receive POISON PILL"); 519 context.askForCheckpoint(); 520 stop = true; 521 } else if (record.getFlags().contains(Record.Flag.COMMIT)) { 522 context.askForCheckpoint(); 523 } 524 } 525 526 protected void checkpointIfNecessary() { 527 if (context.requireCheckpoint()) { 528 boolean completed = false; 529 try { 530 checkpoint(); 531 completed = true; 532 } finally { 533 if (!completed) { 534 log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates."); 535 } 536 } 537 } 538 } 539 540 protected void checkpoint() { 541 sendRecords(); 542 saveTimers(); 543 saveState(); 544 // To Simulate slow checkpoint add a Thread.sleep(1) 545 saveOffsets(); 546 lowWatermark.checkpoint(); 547 context.removeCheckpointFlag(); 548 inCheckpointRecords = inRecords; 549 setThreadName("checkpoint"); 550 log.debug(metadata.name() + ": checkpoint done"); 551 } 552 553 protected void saveTimers() { 554 // TODO: save timers in the key value store NXP-22112 555 } 556 557 protected void saveState() { 558 // TODO: save key value store NXP-22112 559 } 560 561 protected void saveOffsets() { 562 if (tailer != null) { 563 tailer.commit(); 564 Span span = Tracing.getTracer().getCurrentSpan(); 565 span.addAnnotation("Checkpoint positions at " + Instant.now()); 566 } 567 } 568 569 protected void sendRecords() { 570 boolean firstRecord = true; 571 for (String stream : metadata.outputStreams()) { 572 for (Record record : context.getRecords(stream)) { 573 if (record.getWatermark() == 0) { 574 // use low watermark when not set 575 record.setWatermark(lowWatermark.getLow().getValue()); 576 } 577 if (firstRecord) { 578 Span span = Tracing.getTracer().getCurrentSpan(); 579 span.addAnnotation("Sending records at " + Instant.now()); 580 firstRecord = false; 581 } 582 streamManager.append(stream, record); 583 outRecords++; 584 } 585 context.getRecords(stream).clear(); 586 } 587 } 588 589 public Watermark getLowWatermark() { 590 return lowWatermark.getLow(); 591 } 592 593 protected void setThreadName(String message) { 594 String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords 595 + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:" 596 + lowWatermark.getLow().getValue() + ",loop:" + counter; 597 if (message != null) { 598 name += "," + message; 599 } 600 Thread.currentThread().setName(name); 601 } 602 603 @Override 604 public void onPartitionsRevoked(Collection<LogPartition> partitions) { 605 setThreadName("rebalance revoked"); 606 } 607 608 @Override 609 public void onPartitionsAssigned(Collection<LogPartition> partitions) { 610 lastReadTime = System.currentTimeMillis(); 611 boolean isSpare = partitions.isEmpty(); 612 setThreadName("rebalance assigned"); 613 // reset the context 614 this.context = new ComputationContextImpl(streamManager, metadata, policy, partitions.isEmpty()); 615 log.debug(metadata.name() + ": Init isSpare=" + isSpare); 616 computation.init(context); 617 lastReadTime = System.currentTimeMillis(); 618 lastTimerExecution = 0; 619 assignmentLatch.countDown(); 620 // what about watermark ? 621 } 622}