001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import java.time.Duration; 022import java.util.Collection; 023import java.util.LinkedHashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.TimeUnit; 028import java.util.function.Supplier; 029import java.util.stream.Collectors; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.lib.stream.computation.Computation; 034import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 035import org.nuxeo.lib.stream.computation.Record; 036import org.nuxeo.lib.stream.computation.Watermark; 037import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl; 038import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval; 039import org.nuxeo.lib.stream.log.LogAppender; 040import org.nuxeo.lib.stream.log.LogManager; 041import org.nuxeo.lib.stream.log.LogPartition; 042import org.nuxeo.lib.stream.log.LogRecord; 043import org.nuxeo.lib.stream.log.LogTailer; 044import org.nuxeo.lib.stream.log.RebalanceException; 045import org.nuxeo.lib.stream.log.RebalanceListener; 046 047/** 048 * Thread driving a Computation 049 * 050 * @since 9.3 051 */ 052@SuppressWarnings("EmptyMethod") 053public class ComputationRunner implements Runnable, RebalanceListener { 054 public static final Duration READ_TIMEOUT = Duration.ofMillis(25); 055 056 protected static final long STARVING_TIMEOUT_MS = 1000; 057 058 protected static final long INACTIVITY_BREAK_MS = 100; 059 060 private static final Log log = LogFactory.getLog(ComputationRunner.class); 061 062 protected final LogManager logManager; 063 064 protected final ComputationMetadataMapping metadata; 065 066 protected final LogTailer<Record> tailer; 067 068 protected final Supplier<Computation> supplier; 069 070 protected final CountDownLatch assignmentLatch = new CountDownLatch(1); 071 072 protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval(); 073 074 protected ComputationContextImpl context; 075 076 protected volatile boolean stop; 077 078 protected volatile boolean drain; 079 080 protected Computation computation; 081 082 protected long counter; 083 084 protected long inRecords; 085 086 protected long inCheckpointRecords; 087 088 protected long outRecords; 089 090 protected long lastReadTime = System.currentTimeMillis(); 091 092 protected long lastTimerExecution; 093 094 protected String threadName; 095 096 @SuppressWarnings("unchecked") 097 public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 098 List<LogPartition> defaultAssignment, LogManager logManager) { 099 this.supplier = supplier; 100 this.metadata = metadata; 101 this.logManager = logManager; 102 this.context = new ComputationContextImpl(metadata); 103 if (metadata.inputStreams().isEmpty()) { 104 this.tailer = null; 105 assignmentLatch.countDown(); 106 } else if (logManager.supportSubscribe()) { 107 this.tailer = logManager.subscribe(metadata.name(), metadata.inputStreams(), this); 108 } else { 109 this.tailer = logManager.createTailer(metadata.name(), defaultAssignment); 110 assignmentLatch.countDown(); 111 } 112 } 113 114 public void stop() { 115 log.debug(metadata.name() + ": Receives Stop signal"); 116 stop = true; 117 } 118 119 public void drain() { 120 log.debug(metadata.name() + ": Receives Drain signal"); 121 drain = true; 122 } 123 124 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 125 if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 126 log.warn(metadata.name() + ": Timeout waiting for assignment"); 127 return false; 128 } 129 return true; 130 } 131 132 @Override 133 public void run() { 134 threadName = Thread.currentThread().getName(); 135 boolean interrupted = false; 136 computation = supplier.get(); 137 log.debug(metadata.name() + ": Init"); 138 computation.init(context); 139 log.debug(metadata.name() + ": Start"); 140 try { 141 processLoop(); 142 } catch (InterruptedException e) { 143 interrupted = true; // Thread.currentThread().interrupt() in finally 144 // this is expected when the pool is shutdownNow 145 if (log.isTraceEnabled()) { 146 log.debug(metadata.name() + ": Interrupted", e); 147 } else { 148 log.debug(metadata.name() + ": Interrupted"); 149 } 150 } catch (Exception e) { 151 if (Thread.currentThread().isInterrupted()) { 152 // this can happen when pool is shutdownNow throwing ClosedByInterruptException 153 log.info(metadata.name() + ": Interrupted", e); 154 // TODO: test if we should set interrupted = true to rearm the interrupt flag 155 } else { 156 log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e); 157 throw e; 158 } 159 } finally { 160 try { 161 computation.destroy(); 162 closeTailer(); 163 log.debug(metadata.name() + ": Exited"); 164 } finally { 165 if (interrupted) { 166 Thread.currentThread().interrupt(); 167 } 168 } 169 } 170 } 171 172 protected void closeTailer() { 173 if (tailer != null && !tailer.closed()) { 174 tailer.close(); 175 } 176 } 177 178 protected void processLoop() throws InterruptedException { 179 boolean activity; 180 while (continueLoop()) { 181 activity = processTimer(); 182 activity |= processRecord(); 183 counter++; 184 if (!activity) { 185 // no timer nor record to process, take a break 186 Thread.sleep(INACTIVITY_BREAK_MS); 187 } 188 } 189 } 190 191 protected boolean continueLoop() { 192 if (stop || Thread.currentThread().isInterrupted()) { 193 return false; 194 } else if (drain) { 195 long now = System.currentTimeMillis(); 196 // for a source we take lastTimerExecution starvation 197 if (metadata.inputStreams().isEmpty()) { 198 if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) { 199 log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago"); 200 return false; 201 } 202 } else { 203 if ((now - lastReadTime) > STARVING_TIMEOUT_MS) { 204 log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + " ms, " 205 + inRecords + " records read, " + counter + " reads attempt"); 206 return false; 207 } 208 } 209 } 210 return true; 211 } 212 213 protected boolean processTimer() { 214 Map<String, Long> timers = context.getTimers(); 215 if (timers.isEmpty()) { 216 return false; 217 } 218 if (tailer != null && tailer.assignments().isEmpty()) { 219 // needed to ensure single source across multiple nodes 220 return false; 221 } 222 long now = System.currentTimeMillis(); 223 final boolean[] timerUpdate = { false }; 224 // filter and order timers 225 LinkedHashMap<String, Long> sortedTimer = timers.entrySet() 226 .stream() 227 .filter(entry -> entry.getValue() <= now) 228 .sorted(Map.Entry.comparingByValue()) 229 .collect( 230 Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 231 (e1, e2) -> e1, LinkedHashMap::new)); 232 sortedTimer.forEach((key, value) -> { 233 context.removeTimer(key); 234 computation.processTimer(context, key, value); 235 timerUpdate[0] = true; 236 }); 237 if (timerUpdate[0]) { 238 checkSourceLowWatermark(); 239 lastTimerExecution = now; 240 setThreadName("timer"); 241 checkpointIfNecessary(); 242 if (context.requireTerminate()) { 243 stop = true; 244 } 245 return true; 246 } 247 return false; 248 } 249 250 protected boolean processRecord() throws InterruptedException { 251 if (context.requireTerminate()) { 252 stop = true; 253 return true; 254 } 255 if (tailer == null) { 256 return false; 257 } 258 Duration timeoutRead = getTimeoutDuration(); 259 LogRecord<Record> logRecord = null; 260 try { 261 logRecord = tailer.read(timeoutRead); 262 } catch (RebalanceException e) { 263 // the revoke has done a checkpoint we can continue 264 } 265 Record record; 266 if (logRecord != null) { 267 record = logRecord.message(); 268 lastReadTime = System.currentTimeMillis(); 269 inRecords++; 270 lowWatermark.mark(record.watermark); 271 String from = metadata.reverseMap(logRecord.offset().partition().name()); 272 // System.out.println(metadata.name() + ": Receive from " + from + " record: " + record); 273 computation.processRecord(context, from, record); 274 checkRecordFlags(record); 275 checkSourceLowWatermark(); 276 setThreadName("record"); 277 checkpointIfNecessary(); 278 return true; 279 } 280 return false; 281 } 282 283 protected Duration getTimeoutDuration() { 284 // Adapt the duration so we are not throttling when one of the input stream is empty 285 return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime)); 286 } 287 288 protected void checkSourceLowWatermark() { 289 long watermark = context.getSourceLowWatermark(); 290 if (watermark > 0) { 291 lowWatermark.mark(Watermark.ofValue(watermark)); 292 // System.out.println(metadata.name() + ": Set source wm " + lowWatermark); 293 context.setSourceLowWatermark(0); 294 } 295 } 296 297 protected void checkRecordFlags(Record record) { 298 if (record.flags.contains(Record.Flag.POISON_PILL)) { 299 log.info(metadata.name() + ": Receive POISON PILL"); 300 context.askForCheckpoint(); 301 stop = true; 302 } else if (record.flags.contains(Record.Flag.COMMIT)) { 303 context.askForCheckpoint(); 304 } 305 } 306 307 protected void checkpointIfNecessary() { 308 if (context.requireCheckpoint()) { 309 boolean completed = false; 310 try { 311 checkpoint(); 312 completed = true; 313 inCheckpointRecords = inRecords; 314 } finally { 315 if (!completed) { 316 log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates."); 317 } 318 } 319 } 320 } 321 322 protected void checkpoint() { 323 sendRecords(); 324 saveTimers(); 325 saveState(); 326 // Simulate slow checkpoint 327 // try { 328 // Thread.sleep(1); 329 // } catch (InterruptedException e) { 330 // Thread.currentThread().interrupt(); 331 // throw e; 332 // } 333 saveOffsets(); 334 lowWatermark.checkpoint(); 335 // System.out.println("checkpoint " + metadata.name() + " " + lowWatermark); 336 context.removeCheckpointFlag(); 337 log.debug(metadata.name() + ": checkpoint"); 338 setThreadName("checkpoint"); 339 } 340 341 protected void saveTimers() { 342 // TODO: save timers in the key value store NXP-22112 343 } 344 345 protected void saveState() { 346 // TODO: save key value store NXP-22112 347 } 348 349 protected void saveOffsets() { 350 if (tailer != null) { 351 tailer.commit(); 352 } 353 } 354 355 protected void sendRecords() { 356 for (String stream : metadata.outputStreams()) { 357 LogAppender<Record> appender = logManager.getAppender(stream); 358 for (Record record : context.getRecords(stream)) { 359 // System.out.println(metadata.name() + " send record to " + stream + " lowWatermark " + lowWatermark); 360 if (record.watermark == 0) { 361 // use low watermark when not set 362 record.watermark = lowWatermark.getLow().getValue(); 363 } 364 appender.append(record.key, record); 365 outRecords++; 366 } 367 context.getRecords(stream).clear(); 368 } 369 } 370 371 public Watermark getLowWatermark() { 372 return lowWatermark.getLow(); 373 } 374 375 protected void setThreadName(String message) { 376 String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords 377 + ",lastRead:" + lastReadTime + ",lastTimer:" + lastTimerExecution + ",wm:" 378 + lowWatermark.getLow().getValue() + ",loop:" + counter; 379 if (message != null) { 380 name += "," + message; 381 } 382 Thread.currentThread().setName(name); 383 } 384 385 @Override 386 public void onPartitionsRevoked(Collection<LogPartition> partitions) { 387 setThreadName("rebalance revoked"); 388 } 389 390 @Override 391 public void onPartitionsAssigned(Collection<LogPartition> partitions) { 392 lastReadTime = System.currentTimeMillis(); 393 setThreadName("rebalance assigned"); 394 // reset the context 395 this.context = new ComputationContextImpl(metadata); 396 log.debug(metadata.name() + ": Init"); 397 computation.init(context); 398 lastReadTime = System.currentTimeMillis(); 399 lastTimerExecution = 0; 400 assignmentLatch.countDown(); 401 // what about watermark ? 402 } 403}