001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.pattern.consumer.internals; 020 021import static java.lang.Thread.currentThread; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023 024import java.util.Collection; 025import java.util.List; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ThreadLocalRandom; 029import java.util.stream.Collectors; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.lib.stream.codec.Codec; 034import org.nuxeo.lib.stream.log.LogManager; 035import org.nuxeo.lib.stream.log.LogPartition; 036import org.nuxeo.lib.stream.log.LogRecord; 037import org.nuxeo.lib.stream.log.LogTailer; 038import org.nuxeo.lib.stream.log.Name; 039import org.nuxeo.lib.stream.log.RebalanceException; 040import org.nuxeo.lib.stream.log.RebalanceListener; 041import org.nuxeo.lib.stream.pattern.Message; 042import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy; 043import org.nuxeo.lib.stream.pattern.consumer.Consumer; 044import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory; 045import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy; 046import org.nuxeo.lib.stream.pattern.consumer.ConsumerStatus; 047 048import io.dropwizard.metrics5.Counter; 049import io.dropwizard.metrics5.MetricRegistry; 050import io.dropwizard.metrics5.SharedMetricRegistries; 051import io.dropwizard.metrics5.Timer; 052 053import net.jodah.failsafe.Execution; 054 055/** 056 * Read messages from a tailer and drive a consumer according to its policy. 057 * 058 * @since 9.1 059 */ 060public class ConsumerRunner<M extends Message> implements Callable<ConsumerStatus>, RebalanceListener { 061 private static final Log log = LogFactory.getLog(ConsumerRunner.class); 062 063 // This is the registry name used by Nuxeo without adding a dependency nuxeo-runtime 064 public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService"; 065 066 protected final ConsumerFactory<M> factory; 067 068 protected final ConsumerPolicy policy; 069 070 protected final LogTailer<M> tailer; 071 072 protected String consumerId; 073 074 protected BatchPolicy currentBatchPolicy; 075 076 protected String threadName; 077 078 protected Consumer<M> consumer; 079 080 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME); 081 082 protected long acceptCounter; 083 084 protected long committedCounter; 085 086 protected long batchCommitCounter; 087 088 protected long batchFailureCounter; 089 090 protected boolean alreadySalted; 091 092 // Metrics global to all threads, having metrics per thread is way too much 093 protected Timer globalAcceptTimer; 094 095 protected Counter globalCommittedCounter; 096 097 protected Timer globalBatchCommitTimer; 098 099 protected Counter globalBatchFailureCounter; 100 101 protected Counter globalConsumersCounter; 102 103 /** 104 * @deprecated since 11.1, due to serialization issue with java 11, use 105 * {@link #ConsumerRunner(ConsumerFactory, ConsumerPolicy, LogManager, Codec, List)} which allows to 106 * give a {@link org.nuxeo.lib.stream.codec.Codec codec} to {@link org.nuxeo.lib.stream.log.LogTailer 107 * tailer}. 108 */ 109 @Deprecated 110 @SuppressWarnings("unchecked") 111 public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager, 112 List<LogPartition> defaultAssignments) { 113 this(factory, policy, manager, NO_CODEC, defaultAssignments); 114 } 115 116 public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager, Codec<M> codec, 117 List<LogPartition> defaultAssignments) { 118 this.factory = factory; 119 this.currentBatchPolicy = policy.getBatchPolicy(); 120 this.policy = policy; 121 this.tailer = createTailer(manager, codec, defaultAssignments); 122 consumerId = tailer.toString(); 123 globalConsumersCounter = registry.counter(MetricRegistry.name("nuxeo", "importer", "stream", "consumers")); 124 setTailerPosition(manager); 125 log.debug("Consumer thread created tailing on: " + consumerId); 126 } 127 128 protected LogTailer<M> createTailer(LogManager manager, Codec<M> codec, List<LogPartition> defaultAssignments) { 129 LogTailer<M> tailer; 130 if (manager.supportSubscribe()) { 131 Set<Name> names = defaultAssignments.stream().map(LogPartition::name).collect(Collectors.toSet()); 132 tailer = manager.subscribe(Name.ofUrn(policy.getName()), names, this, codec); 133 } else { 134 tailer = manager.createTailer(Name.ofUrn(policy.getName()), defaultAssignments, codec); 135 } 136 return tailer; 137 } 138 139 @Override 140 public ConsumerStatus call() throws Exception { 141 threadName = currentThread().getName(); 142 setMetrics(); 143 globalConsumersCounter.inc(); 144 long start = System.currentTimeMillis(); 145 consumer = factory.createConsumer(consumerId); 146 try { 147 consumerLoop(); 148 } finally { 149 consumer.close(); 150 globalConsumersCounter.dec(); 151 tailer.close(); 152 } 153 return new ConsumerStatus(consumerId, acceptCounter, committedCounter, batchCommitCounter, batchFailureCounter, 154 start, System.currentTimeMillis(), false); 155 } 156 157 protected void setMetrics() { 158 globalAcceptTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "accepted")); 159 globalCommittedCounter = registry.counter( 160 MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "committed")); 161 globalBatchFailureCounter = registry.counter( 162 MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "batchFailure")); 163 globalBatchCommitTimer = registry.timer( 164 MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "batchCommit")); 165 } 166 167 protected void addSalt() throws InterruptedException { 168 if (alreadySalted) { 169 return; 170 } 171 // this random delay prevent consumers to be too much synchronized 172 if (policy.isSalted()) { 173 long randomDelay = ThreadLocalRandom.current() 174 .nextLong(policy.getBatchPolicy().getTimeThreshold().toMillis()); 175 Thread.sleep(randomDelay); 176 } 177 alreadySalted = true; 178 } 179 180 protected void setTailerPosition(LogManager manager) { 181 ConsumerPolicy.StartOffset seekPosition = policy.getStartOffset(); 182 if (manager.supportSubscribe() && seekPosition != ConsumerPolicy.StartOffset.LAST_COMMITTED) { 183 throw new UnsupportedOperationException( 184 "Tailer startOffset to " + seekPosition + " is not supported in subscribe mode"); 185 } 186 switch (policy.getStartOffset()) { 187 case BEGIN: 188 tailer.toStart(); 189 break; 190 case END: 191 tailer.toEnd(); 192 break; 193 default: 194 tailer.toLastCommitted(); 195 } 196 } 197 198 protected void consumerLoop() throws InterruptedException { 199 boolean end = false; 200 while (!end) { 201 Execution execution = new Execution(policy.getRetryPolicy()); 202 end = processBatchWithRetry(execution); 203 if (execution.getLastFailure() != null) { 204 if (policy.continueOnFailure()) { 205 log.error("Skip message on failure after applying the retry policy: ", execution.getLastFailure()); 206 } else { 207 log.error("Abort on Failure after applying the retry policy: ", execution.getLastFailure()); 208 end = true; 209 } 210 } 211 } 212 } 213 214 protected boolean processBatchWithRetry(Execution execution) throws InterruptedException { 215 boolean end = false; 216 while (!execution.isComplete()) { 217 try { 218 end = processBatch(); 219 tailer.commit(); 220 execution.complete(); 221 } catch (Throwable t) { 222 globalBatchFailureCounter.inc(); 223 batchFailureCounter += 1; 224 if (t instanceof InterruptedException) { 225 Thread.currentThread().interrupt(); 226 throw t; 227 } 228 if (t instanceof RebalanceException) { 229 log.info("Rebalance"); 230 // the current batch is rollback because of this exception 231 // we continue with the new tailer assignment 232 } else if (execution.canRetryOn(t)) { 233 setBatchRetryPolicy(); 234 tailer.toLastCommitted(); 235 } else { 236 throw t; 237 } 238 } 239 restoreBatchPolicy(); 240 } 241 return end; 242 } 243 244 protected void setBatchRetryPolicy() { 245 currentBatchPolicy = BatchPolicy.NO_BATCH; 246 } 247 248 protected void restoreBatchPolicy() { 249 currentBatchPolicy = policy.getBatchPolicy(); 250 } 251 252 protected boolean processBatch() throws InterruptedException { 253 boolean end = false; 254 beginBatch(); 255 try { 256 BatchState state = acceptBatch(); 257 commitBatch(state); 258 if (state.getState() == BatchState.State.LAST) { 259 log.info("No more message on tailer: " + tailer); 260 end = true; 261 } 262 } catch (Exception e) { 263 try { 264 rollbackBatch(e); 265 } catch (Exception rollbackException) { 266 log.error("Exception on rollback invocation", rollbackException); 267 // we propagate the initial error. 268 } 269 throw e; 270 } 271 return end; 272 } 273 274 protected void beginBatch() { 275 consumer.begin(); 276 } 277 278 protected void commitBatch(BatchState state) { 279 try (Timer.Context ignore = globalBatchCommitTimer.time()) { 280 consumer.commit(); 281 committedCounter += state.getSize(); 282 globalCommittedCounter.inc(state.getSize()); 283 batchCommitCounter += 1; 284 if (log.isDebugEnabled()) { 285 log.debug("Commit batch size: " + state.getSize() + ", total committed: " + committedCounter); 286 } 287 } 288 } 289 290 protected void rollbackBatch(Exception e) { 291 if (e instanceof RebalanceException) { 292 log.warn("Rollback current batch because of consumer rebalancing"); 293 } else { 294 log.warn("Rollback batch", e); 295 } 296 consumer.rollback(); 297 } 298 299 protected BatchState acceptBatch() throws InterruptedException { 300 BatchState batch = new BatchState(currentBatchPolicy); 301 batch.start(); 302 LogRecord<M> record; 303 M message; 304 while ((record = tailer.read(policy.getWaitMessageTimeout())) != null) { 305 addSalt(); // do this here so kafka subscription happens concurrently 306 message = record.message(); 307 if (message.poisonPill()) { 308 log.warn("Receive a poison pill: " + message); 309 batch.last(); 310 } else { 311 try (Timer.Context ignore = globalAcceptTimer.time()) { 312 setThreadName(message.getId()); 313 consumer.accept(message); 314 acceptCounter += 1; 315 } 316 batch.inc(); 317 if (message.forceBatch()) { 318 if (log.isDebugEnabled()) { 319 log.debug("Force end of batch: " + message); 320 } 321 batch.force(); 322 } 323 } 324 if (batch.getState() != BatchState.State.FILLING) { 325 return batch; 326 } 327 } 328 batch.last(); 329 log.info(String.format("No record after: %ds on %s, terminating", policy.getWaitMessageTimeout().getSeconds(), 330 consumerId)); 331 return batch; 332 } 333 334 protected void setThreadName(String message) { 335 String name = threadName + "-" + acceptCounter + "-" + message; 336 currentThread().setName(name); 337 } 338 339 @Override 340 public void onPartitionsRevoked(Collection<LogPartition> partitions) { 341 setThreadName("rebalance-revoked"); 342 } 343 344 @Override 345 public void onPartitionsLost(Collection<LogPartition> partitions) { 346 setThreadName("rebalance-LOST"); 347 } 348 349 @Override 350 public void onPartitionsAssigned(Collection<LogPartition> partitions) { 351 consumerId = tailer.toString(); 352 setThreadName("rebalance-assigned-" + tailer.toString()); 353 } 354}