001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import java.time.Duration; 022import java.util.Collection; 023import java.util.LinkedHashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Supplier; 029import java.util.stream.Collectors; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.lib.stream.codec.Codec; 034import org.nuxeo.lib.stream.computation.Computation; 035import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 036import org.nuxeo.lib.stream.computation.Record; 037import org.nuxeo.lib.stream.computation.Watermark; 038import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl; 039import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval; 040import org.nuxeo.lib.stream.log.LogAppender; 041import org.nuxeo.lib.stream.log.LogManager; 042import org.nuxeo.lib.stream.log.LogPartition; 043import org.nuxeo.lib.stream.log.LogRecord; 044import org.nuxeo.lib.stream.log.LogTailer; 045import org.nuxeo.lib.stream.log.RebalanceException; 046import org.nuxeo.lib.stream.log.RebalanceListener; 047 048/** 049 * Thread driving a Computation 050 * 051 * @since 9.3 052 */ 053@SuppressWarnings("EmptyMethod") 054public class ComputationRunner implements Runnable, RebalanceListener { 055 public static final Duration READ_TIMEOUT = Duration.ofMillis(25); 056 057 protected static final long STARVING_TIMEOUT_MS = 1000; 058 059 protected static final long INACTIVITY_BREAK_MS = 100; 060 061 private static final Log log = LogFactory.getLog(ComputationRunner.class); 062 063 protected final LogManager logManager; 064 065 protected final ComputationMetadataMapping metadata; 066 067 protected final LogTailer<Record> tailer; 068 069 protected final Supplier<Computation> supplier; 070 071 protected final CountDownLatch assignmentLatch = new CountDownLatch(1); 072 073 protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval(); 074 075 protected final Codec<Record> inputCodec; 076 077 protected final Codec<Record> outputCodec; 078 079 protected ComputationContextImpl context; 080 081 protected volatile boolean stop; 082 083 protected volatile boolean drain; 084 085 protected Computation computation; 086 087 protected long counter; 088 089 protected long inRecords; 090 091 protected long inCheckpointRecords; 092 093 protected long outRecords; 094 095 protected long lastReadTime = System.currentTimeMillis(); 096 097 protected long lastTimerExecution; 098 099 protected String threadName; 100 101 @SuppressWarnings("unchecked") 102 public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 103 List<LogPartition> defaultAssignment, LogManager logManager, Codec<Record> inputCodec, 104 Codec<Record> outputCodec) { 105 this.supplier = supplier; 106 this.metadata = metadata; 107 this.logManager = logManager; 108 this.context = new ComputationContextImpl(metadata); 109 this.inputCodec = inputCodec; 110 this.outputCodec = outputCodec; 111 if (metadata.inputStreams().isEmpty()) { 112 this.tailer = null; 113 assignmentLatch.countDown(); 114 } else if (logManager.supportSubscribe()) { 115 this.tailer = logManager.subscribe(metadata.name(), metadata.inputStreams(), this, inputCodec); 116 } else { 117 this.tailer = logManager.createTailer(metadata.name(), defaultAssignment, inputCodec); 118 assignmentLatch.countDown(); 119 } 120 } 121 122 public void stop() { 123 log.debug(metadata.name() + ": Receives Stop signal"); 124 stop = true; 125 if (computation != null) { 126 computation.signalStop(); 127 } 128 } 129 130 public void drain() { 131 log.debug(metadata.name() + ": Receives Drain signal"); 132 drain = true; 133 } 134 135 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 136 if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 137 log.warn(metadata.name() + ": Timeout waiting for assignment"); 138 return false; 139 } 140 return true; 141 } 142 143 @Override 144 public void run() { 145 threadName = Thread.currentThread().getName(); 146 boolean interrupted = false; 147 computation = supplier.get(); 148 log.debug(metadata.name() + ": Init"); 149 computation.init(context); 150 log.debug(metadata.name() + ": Start"); 151 try { 152 processLoop(); 153 } catch (InterruptedException e) { 154 interrupted = true; // Thread.currentThread().interrupt() in finally 155 // this is expected when the pool is shutdownNow 156 String msg = metadata.name() + ": Interrupted"; 157 if (log.isTraceEnabled()) { 158 log.debug(msg, e); 159 } else { 160 log.debug(msg); 161 } 162 } catch (Exception e) { 163 if (Thread.currentThread().isInterrupted()) { 164 // this can happen when pool is shutdownNow throwing ClosedByInterruptException 165 log.info(metadata.name() + ": Interrupted", e); 166 // TODO: test if we should set interrupted = true to rearm the interrupt flag 167 } else { 168 log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e); 169 throw e; 170 } 171 } finally { 172 try { 173 computation.destroy(); 174 closeTailer(); 175 log.debug(metadata.name() + ": Exited"); 176 } finally { 177 if (interrupted) { 178 Thread.currentThread().interrupt(); 179 } 180 } 181 } 182 } 183 184 protected void closeTailer() { 185 if (tailer != null && !tailer.closed()) { 186 tailer.close(); 187 } 188 } 189 190 protected void processLoop() throws InterruptedException { 191 boolean activity; 192 while (continueLoop()) { 193 activity = processTimer(); 194 activity |= processRecord(); 195 counter++; 196 if (!activity) { 197 // no timer nor record to process, take a break 198 Thread.sleep(INACTIVITY_BREAK_MS); 199 } 200 } 201 } 202 203 protected boolean continueLoop() { 204 if (stop || Thread.currentThread().isInterrupted()) { 205 return false; 206 } else if (drain) { 207 long now = System.currentTimeMillis(); 208 // for a source we take lastTimerExecution starvation 209 if (metadata.inputStreams().isEmpty()) { 210 if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) { 211 log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago"); 212 return false; 213 } 214 } else { 215 if ((now - lastReadTime) > STARVING_TIMEOUT_MS) { 216 log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, " 217 + inRecords + " records read, " + counter + " reads attempt"); 218 return false; 219 } 220 } 221 } 222 return true; 223 } 224 225 protected boolean processTimer() { 226 Map<String, Long> timers = context.getTimers(); 227 if (timers.isEmpty()) { 228 return false; 229 } 230 if (tailer != null && tailer.assignments().isEmpty()) { 231 // needed to ensure single source across multiple nodes 232 return false; 233 } 234 long now = System.currentTimeMillis(); 235 final boolean[] timerUpdate = { false }; 236 // filter and order timers 237 LinkedHashMap<String, Long> sortedTimer = timers.entrySet() 238 .stream() 239 .filter(entry -> entry.getValue() <= now) 240 .sorted(Map.Entry.comparingByValue()) 241 .collect( 242 Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 243 (e1, e2) -> e1, LinkedHashMap::new)); 244 sortedTimer.forEach((key, value) -> { 245 context.removeTimer(key); 246 computation.processTimer(context, key, value); 247 timerUpdate[0] = true; 248 }); 249 if (timerUpdate[0]) { 250 checkSourceLowWatermark(); 251 lastTimerExecution = now; 252 setThreadName("timer"); 253 checkpointIfNecessary(); 254 if (context.requireTerminate()) { 255 stop = true; 256 } 257 return true; 258 } 259 return false; 260 } 261 262 protected boolean processRecord() throws InterruptedException { 263 if (context.requireTerminate()) { 264 stop = true; 265 return true; 266 } 267 if (tailer == null) { 268 return false; 269 } 270 Duration timeoutRead = getTimeoutDuration(); 271 LogRecord<Record> logRecord = null; 272 try { 273 logRecord = tailer.read(timeoutRead); 274 } catch (RebalanceException e) { 275 // the revoke has done a checkpoint we can continue 276 } 277 Record record; 278 if (logRecord != null) { 279 record = logRecord.message(); 280 lastReadTime = System.currentTimeMillis(); 281 inRecords++; 282 lowWatermark.mark(record.getWatermark()); 283 String from = metadata.reverseMap(logRecord.offset().partition().name()); 284 computation.processRecord(context, from, record); 285 checkRecordFlags(record); 286 checkSourceLowWatermark(); 287 setThreadName("record"); 288 checkpointIfNecessary(); 289 return true; 290 } 291 return false; 292 } 293 294 protected Duration getTimeoutDuration() { 295 // Adapt the duration so we are not throttling when one of the input stream is empty 296 return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime)); 297 } 298 299 protected void checkSourceLowWatermark() { 300 long watermark = context.getSourceLowWatermark(); 301 if (watermark > 0) { 302 lowWatermark.mark(Watermark.ofValue(watermark)); 303 context.setSourceLowWatermark(0); 304 } 305 } 306 307 protected void checkRecordFlags(Record record) { 308 if (record.getFlags().contains(Record.Flag.POISON_PILL)) { 309 log.info(metadata.name() + ": Receive POISON PILL"); 310 context.askForCheckpoint(); 311 stop = true; 312 } else if (record.getFlags().contains(Record.Flag.COMMIT)) { 313 context.askForCheckpoint(); 314 } 315 } 316 317 protected void checkpointIfNecessary() { 318 if (context.requireCheckpoint()) { 319 boolean completed = false; 320 try { 321 checkpoint(); 322 completed = true; 323 inCheckpointRecords = inRecords; 324 } finally { 325 if (!completed) { 326 log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates."); 327 } 328 } 329 } 330 } 331 332 protected void checkpoint() { 333 sendRecords(); 334 saveTimers(); 335 saveState(); 336 // To Simulate slow checkpoint add a Thread.sleep(1) 337 saveOffsets(); 338 lowWatermark.checkpoint(); 339 context.removeCheckpointFlag(); 340 log.debug(metadata.name() + ": checkpoint"); 341 setThreadName("checkpoint"); 342 } 343 344 protected void saveTimers() { 345 // TODO: save timers in the key value store NXP-22112 346 } 347 348 protected void saveState() { 349 // TODO: save key value store NXP-22112 350 } 351 352 protected void saveOffsets() { 353 if (tailer != null) { 354 tailer.commit(); 355 } 356 } 357 358 protected void sendRecords() { 359 for (String stream : metadata.outputStreams()) { 360 LogAppender<Record> appender = logManager.getAppender(stream, outputCodec); 361 for (Record record : context.getRecords(stream)) { 362 if (record.getWatermark() == 0) { 363 // use low watermark when not set 364 record.setWatermark(lowWatermark.getLow().getValue()); 365 } 366 appender.append(record.getKey(), record); 367 outRecords++; 368 } 369 context.getRecords(stream).clear(); 370 } 371 } 372 373 public Watermark getLowWatermark() { 374 return lowWatermark.getLow(); 375 } 376 377 protected void setThreadName(String message) { 378 String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords 379 + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:" 380 + lowWatermark.getLow().getValue() + ",loop:" + counter; 381 if (message != null) { 382 name += "," + message; 383 } 384 Thread.currentThread().setName(name); 385 } 386 387 @Override 388 public void onPartitionsRevoked(Collection<LogPartition> partitions) { 389 setThreadName("rebalance revoked"); 390 } 391 392 @Override 393 public void onPartitionsAssigned(Collection<LogPartition> partitions) { 394 lastReadTime = System.currentTimeMillis(); 395 setThreadName("rebalance assigned"); 396 // reset the context 397 this.context = new ComputationContextImpl(metadata); 398 log.debug(metadata.name() + ": Init"); 399 computation.init(context); 400 lastReadTime = System.currentTimeMillis(); 401 lastTimerExecution = 0; 402 assignmentLatch.countDown(); 403 // what about watermark ? 404 } 405}