001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.platform.importer.mqueues.computation.Computation; 024import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationMetadataMapping; 025import org.nuxeo.ecm.platform.importer.mqueues.computation.Record; 026import org.nuxeo.ecm.platform.importer.mqueues.computation.Watermark; 027import org.nuxeo.ecm.platform.importer.mqueues.computation.internals.ComputationContextImpl; 028import org.nuxeo.ecm.platform.importer.mqueues.computation.internals.WatermarkMonotonicInterval; 029import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender; 030import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager; 031import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 032import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceException; 033import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener; 034import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord; 035import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer; 036 037import java.time.Duration; 038import java.util.Collection; 039import java.util.LinkedHashMap; 040import java.util.List; 041import java.util.Map; 042import java.util.concurrent.CountDownLatch; 043import java.util.concurrent.TimeUnit; 044import java.util.function.Supplier; 045import java.util.stream.Collectors; 046 047/** 048 * Thread driving a Computation 049 * 050 * @since 9.2 051 */ 052public class MQComputationRunner implements Runnable, MQRebalanceListener { 053 private static final Log log = LogFactory.getLog(MQComputationRunner.class); 054 private static final long STARVING_TIMEOUT_MS = 1000; 055 public static final Duration READ_TIMEOUT = Duration.ofMillis(25); 056 057 private ComputationContextImpl context; 058 private final MQManager<Record> mqManager; 059 private final ComputationMetadataMapping metadata; 060 private final MQTailer<Record> tailer; 061 private final Supplier<Computation> supplier; 062 063 private volatile boolean stop = false; 064 private volatile boolean drain = false; 065 private CountDownLatch assignmentLatch = new CountDownLatch(1); 066 067 private Computation computation; 068 private long counter = 0; 069 private long inRecords = 0; 070 private long inCheckpointRecords = 0; 071 private long outRecords = 0; 072 private final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval(); 073 private long lastReadTime = System.currentTimeMillis(); 074 private long lastTimerExecution = 0; 075 private String threadName; 076 private boolean needContextInitialize = false; 077 078 @SuppressWarnings("unchecked") 079 public MQComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 080 List<MQPartition> defaultAssignment, MQManager<Record> mqManager) { 081 this.supplier = supplier; 082 this.metadata = metadata; 083 this.mqManager = mqManager; 084 this.context = new ComputationContextImpl(metadata); 085 if (metadata.inputStreams().isEmpty()) { 086 this.tailer = null; 087 assignmentLatch.countDown(); 088 } else if (mqManager.supportSubscribe()) { 089 this.tailer = mqManager.subscribe(metadata.name(), metadata.inputStreams(), this); 090 } else { 091 this.tailer = mqManager.createTailer(metadata.name(), defaultAssignment); 092 assignmentLatch.countDown(); 093 } 094 } 095 096 public void stop() { 097 log.debug(metadata.name() + ": Receives Stop signal"); 098 stop = true; 099 } 100 101 public void drain() { 102 log.debug(metadata.name() + ": Receives Drain signal"); 103 drain = true; 104 } 105 106 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 107 if (!assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 108 log.warn(metadata.name() + ": Timeout waiting for assignment"); 109 return false; 110 } 111 return true; 112 } 113 114 @Override 115 public void run() { 116 threadName = Thread.currentThread().getName(); 117 boolean interrupted = false; 118 computation = supplier.get(); 119 log.debug(metadata.name() + ": Init"); 120 computation.init(context); 121 log.debug(metadata.name() + ": Start"); 122 try { 123 processLoop(); 124 } catch (InterruptedException e) { 125 // this is expected when the pool is shutdownNow 126 if (log.isTraceEnabled()) { 127 log.debug(metadata.name() + ": Interrupted", e); 128 } else { 129 log.debug(metadata.name() + ": Interrupted"); 130 } 131 // the interrupt flag is set after the tailer are closed 132 interrupted = true; 133 } catch (Exception e) { 134 if (Thread.currentThread().isInterrupted()) { 135 // this can happen when pool is shutdownNow throwing ClosedByInterruptException 136 log.info(metadata.name() + ": Interrupted", e); 137 } else { 138 log.error(metadata.name() + ": Exception in processLoop: " + e.getMessage(), e); 139 throw e; 140 } 141 } finally { 142 try { 143 computation.destroy(); 144 closeTailer(); 145 log.debug(metadata.name() + ": Exited"); 146 } finally { 147 if (interrupted) { 148 Thread.currentThread().interrupt(); 149 } 150 } 151 } 152 } 153 154 private void closeTailer() { 155 if (tailer != null && !tailer.closed()) { 156 try { 157 tailer.close(); 158 } catch (Exception e) { 159 log.debug(metadata.name() + ": Exception while closing tailer", e); 160 } 161 } 162 } 163 164 private void processLoop() throws InterruptedException { 165 while (continueLoop()) { 166 processTimer(); 167 processRecord(); 168 counter++; 169 // TODO: add pause for computation without inputs or without timer to prevent CPU hogs 170 } 171 } 172 173 private boolean continueLoop() { 174 if (stop || Thread.currentThread().isInterrupted()) { 175 return false; 176 } else if (drain) { 177 long now = System.currentTimeMillis(); 178 // for a source we take lastTimerExecution starvation 179 if (metadata.inputStreams().isEmpty()) { 180 if (lastTimerExecution > 0 && (now - lastTimerExecution) > STARVING_TIMEOUT_MS) { 181 log.info(metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago"); 182 return false; 183 } 184 } else { 185 if ((now - lastReadTime) > STARVING_TIMEOUT_MS) { 186 log.info(metadata.name() + ": End of drain no more input after " + (now - lastReadTime) + 187 " ms, " + inRecords + " records read, " + counter + " reads attempt"); 188 return false; 189 } 190 } 191 } 192 return true; 193 } 194 195 private void processTimer() throws InterruptedException { 196 Map<String, Long> timers = context.getTimers(); 197 if (timers.isEmpty()) { 198 return; 199 } 200 long now = System.currentTimeMillis(); 201 final boolean[] timerUpdate = {false}; 202 // filter and order timers 203 LinkedHashMap<String, Long> sortedTimer = timers.entrySet().stream() 204 .filter(entry -> entry.getValue() <= now) 205 .sorted(Map.Entry.comparingByValue()) 206 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new)); 207 sortedTimer.forEach((key, value) -> { 208 context.removeTimer(key); 209 computation.processTimer(context, key, value); 210 timerUpdate[0] = true; 211 }); 212 if (timerUpdate[0]) { 213 checkSourceLowWatermark(); 214 lastTimerExecution = now; 215 setThreadName("timer"); 216 checkpointIfNecessary(); 217 } 218 219 } 220 221 private void processRecord() throws InterruptedException { 222 if (tailer == null) { 223 return; 224 } 225 Duration timeoutRead = getTimeoutDuration(); 226 MQRecord<Record> mqRecord = null; 227 try { 228 mqRecord = tailer.read(timeoutRead); 229 } catch (MQRebalanceException e) { 230 // the revoke has done a checkpoint we can continue 231 } 232 Record record; 233 if (mqRecord != null) { 234 record = mqRecord.message(); 235 lastReadTime = System.currentTimeMillis(); 236 inRecords++; 237 lowWatermark.mark(record.watermark); 238 String from = metadata.reverseMap(mqRecord.partition().name()); 239 // System.out.println(metadata.name() + ": Receive from " + from + " record: " + record); 240 computation.processRecord(context, from, record); 241 checkRecordFlags(record); 242 checkSourceLowWatermark(); 243 setThreadName("record"); 244 checkpointIfNecessary(); 245 } 246 } 247 248 private Duration getTimeoutDuration() { 249 // Adapt the duration so we are not throttling when one of the input stream is empty 250 return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - lastReadTime)); 251 } 252 253 private void checkSourceLowWatermark() { 254 long watermark = context.getSourceLowWatermark(); 255 if (watermark > 0) { 256 lowWatermark.mark(Watermark.ofValue(watermark)); 257 // System.out.println(metadata.name() + ": Set source wm " + lowWatermark); 258 context.setSourceLowWatermark(0); 259 } 260 } 261 262 private void checkRecordFlags(Record record) { 263 if (record.flags.contains(Record.Flag.POISON_PILL)) { 264 log.info(metadata.name() + ": Receive POISON PILL"); 265 context.askForCheckpoint(); 266 stop = true; 267 } else if (record.flags.contains(Record.Flag.COMMIT)) { 268 context.askForCheckpoint(); 269 } 270 } 271 272 private void checkpointIfNecessary() throws InterruptedException { 273 if (context.requireCheckpoint()) { 274 boolean completed = false; 275 try { 276 checkpoint(); 277 completed = true; 278 inCheckpointRecords = inRecords; 279 } finally { 280 if (!completed) { 281 log.error(metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates."); 282 } 283 } 284 } 285 } 286 287 private void checkpoint() throws InterruptedException { 288 sendRecords(); 289 saveTimers(); 290 saveState(); 291 // Simulate slow checkpoint 292// try { 293// Thread.sleep(1); 294// } catch (InterruptedException e) { 295// Thread.currentThread().interrupt(); 296// throw e; 297// } 298 saveOffsets(); 299 lowWatermark.checkpoint(); 300 // System.out.println("checkpoint " + metadata.name() + " " + lowWatermark); 301 context.removeCheckpointFlag(); 302 log.debug(metadata.name() + ": checkpoint"); 303 setThreadName("checkpoint"); 304 } 305 306 private void saveTimers() { 307 // TODO save timers in the key value store 308 } 309 310 private void saveState() { 311 // TODO save key value store 312 } 313 314 private void saveOffsets() { 315 if (tailer != null) { 316 tailer.commit(); 317 } 318 } 319 320 private void sendRecords() { 321 for (String ostream : metadata.outputStreams()) { 322 MQAppender<Record> appender = mqManager.getAppender(ostream); 323 for (Record record : context.getRecords(ostream)) { 324 // System.out.println(metadata.name() + " send record to " + ostream + " lowwm " + lowWatermark); 325 if (record.watermark == 0) { 326 // use low watermark when not set 327 record.watermark = lowWatermark.getLow().getValue(); 328 } 329 appender.append(record.key, record); 330 outRecords++; 331 } 332 context.getRecords(ostream).clear(); 333 } 334 } 335 336 public Watermark getLowWatermark() { 337 return lowWatermark.getLow(); 338 } 339 340 private void setThreadName(String message) { 341 String name = threadName + ",in:" + inRecords + ",inCheckpoint:" + inCheckpointRecords + ",out:" + outRecords + 342 ",lastRead:" + lastReadTime + 343 ",lastTimer:" + lastTimerExecution + ",wm:" + lowWatermark.getLow().getValue() + 344 ",loop:" + counter; 345 if (message != null) { 346 name += "," + message; 347 } 348 Thread.currentThread().setName(name); 349 } 350 351 @Override 352 public void onPartitionsRevoked(Collection<MQPartition> partitions) { 353 setThreadName("rebalance revoked"); 354 } 355 356 @Override 357 public void onPartitionsAssigned(Collection<MQPartition> partitions) { 358 lastReadTime = System.currentTimeMillis(); 359 setThreadName("rebalance assigned"); 360 // reset the context 361 this.context = new ComputationContextImpl(metadata); 362 log.debug(metadata.name() + ": Init"); 363 computation.init(context); 364 lastReadTime = System.currentTimeMillis(); 365 lastTimerExecution = 0; 366 assignmentLatch.countDown(); 367 // what about watermark ? 368 } 369}