001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.pattern.consumer.internals; 020 021import static java.lang.Thread.currentThread; 022 023import java.util.Collection; 024import java.util.List; 025import java.util.Set; 026import java.util.concurrent.Callable; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.stream.Collectors; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.lib.stream.log.LogManager; 033import org.nuxeo.lib.stream.log.LogPartition; 034import org.nuxeo.lib.stream.log.LogRecord; 035import org.nuxeo.lib.stream.log.LogTailer; 036import org.nuxeo.lib.stream.log.RebalanceException; 037import org.nuxeo.lib.stream.log.RebalanceListener; 038import org.nuxeo.lib.stream.pattern.Message; 039import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy; 040import org.nuxeo.lib.stream.pattern.consumer.Consumer; 041import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory; 042import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy; 043import org.nuxeo.lib.stream.pattern.consumer.ConsumerStatus; 044 045import com.codahale.metrics.Counter; 046import com.codahale.metrics.MetricRegistry; 047import com.codahale.metrics.SharedMetricRegistries; 048import com.codahale.metrics.Timer; 049 050import net.jodah.failsafe.Execution; 051 052/** 053 * Read messages from a tailer and drive a consumer according to its policy. 054 * 055 * @since 9.1 056 */ 057public class ConsumerRunner<M extends Message> implements Callable<ConsumerStatus>, RebalanceListener { 058 private static final Log log = LogFactory.getLog(ConsumerRunner.class); 059 060 // This is the registry name used by Nuxeo without adding a dependency nuxeo-runtime 061 public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService"; 062 063 protected final ConsumerFactory<M> factory; 064 065 protected final ConsumerPolicy policy; 066 067 protected final LogTailer<M> tailer; 068 069 protected String consumerId; 070 071 protected BatchPolicy currentBatchPolicy; 072 073 protected String threadName; 074 075 protected Consumer<M> consumer; 076 077 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME); 078 079 protected long acceptCounter; 080 081 protected long committedCounter; 082 083 protected long batchCommitCounter; 084 085 protected long batchFailureCounter; 086 087 protected boolean alreadySalted; 088 089 // Metrics global to all threads, having metrics per thread is way too much 090 protected Timer globalAcceptTimer; 091 092 protected Counter globalCommittedCounter; 093 094 protected Timer globalBatchCommitTimer; 095 096 protected Counter globalBatchFailureCounter; 097 098 protected Counter globalConsumersCounter; 099 100 public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager, 101 List<LogPartition> defaultAssignments) { 102 this.factory = factory; 103 this.currentBatchPolicy = policy.getBatchPolicy(); 104 this.policy = policy; 105 this.tailer = createTailer(manager, defaultAssignments); 106 consumerId = tailer.toString(); 107 globalConsumersCounter = registry.counter(MetricRegistry.name("nuxeo", "importer", "stream", "consumers")); 108 setTailerPosition(manager); 109 log.debug("Consumer thread created tailing on: " + consumerId); 110 } 111 112 protected LogTailer<M> createTailer(LogManager manager, List<LogPartition> defaultAssignments) { 113 LogTailer<M> tailer; 114 if (manager.supportSubscribe()) { 115 Set<String> names = defaultAssignments.stream().map(LogPartition::name).collect(Collectors.toSet()); 116 tailer = manager.subscribe(policy.getName(), names, this); 117 } else { 118 tailer = manager.createTailer(policy.getName(), defaultAssignments); 119 } 120 return tailer; 121 } 122 123 @Override 124 public ConsumerStatus call() throws Exception { 125 threadName = currentThread().getName(); 126 setMetrics(); 127 globalConsumersCounter.inc(); 128 long start = System.currentTimeMillis(); 129 consumer = factory.createConsumer(consumerId); 130 try { 131 consumerLoop(); 132 } finally { 133 consumer.close(); 134 globalConsumersCounter.dec(); 135 tailer.close(); 136 } 137 return new ConsumerStatus(consumerId, acceptCounter, committedCounter, batchCommitCounter, batchFailureCounter, 138 start, System.currentTimeMillis(), false); 139 } 140 141 protected void setMetrics() { 142 globalAcceptTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "accepted")); 143 globalCommittedCounter = registry.counter( 144 MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "committed")); 145 globalBatchFailureCounter = registry.counter( 146 MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "batchFailure")); 147 globalBatchCommitTimer = registry.timer( 148 MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "batchCommit")); 149 } 150 151 protected void addSalt() throws InterruptedException { 152 if (alreadySalted) { 153 return; 154 } 155 // this random delay prevent consumers to be too much synchronized 156 if (policy.isSalted()) { 157 long randomDelay = ThreadLocalRandom.current() 158 .nextLong(policy.getBatchPolicy().getTimeThreshold().toMillis()); 159 Thread.sleep(randomDelay); 160 } 161 alreadySalted = true; 162 } 163 164 protected void setTailerPosition(LogManager manager) { 165 ConsumerPolicy.StartOffset seekPosition = policy.getStartOffset(); 166 if (manager.supportSubscribe() && seekPosition != ConsumerPolicy.StartOffset.LAST_COMMITTED) { 167 throw new UnsupportedOperationException( 168 "Tailer startOffset to " + seekPosition + " is not supported in subscribe mode"); 169 } 170 switch (policy.getStartOffset()) { 171 case BEGIN: 172 tailer.toStart(); 173 break; 174 case END: 175 tailer.toEnd(); 176 break; 177 default: 178 tailer.toLastCommitted(); 179 } 180 } 181 182 protected void consumerLoop() throws InterruptedException { 183 boolean end = false; 184 while (!end) { 185 Execution execution = new Execution(policy.getRetryPolicy()); 186 end = processBatchWithRetry(execution); 187 if (execution.getLastFailure() != null) { 188 if (policy.continueOnFailure()) { 189 log.error("Skip message on failure after applying the retry policy: ", execution.getLastFailure()); 190 } else { 191 log.error("Abort on Failure after applying the retry policy: ", execution.getLastFailure()); 192 end = true; 193 } 194 } 195 } 196 } 197 198 protected boolean processBatchWithRetry(Execution execution) throws InterruptedException { 199 boolean end = false; 200 while (!execution.isComplete()) { 201 try { 202 end = processBatch(); 203 tailer.commit(); 204 execution.complete(); 205 } catch (Throwable t) { 206 globalBatchFailureCounter.inc(); 207 batchFailureCounter += 1; 208 if (t instanceof InterruptedException) { 209 Thread.currentThread().interrupt(); 210 throw t; 211 } 212 if (t instanceof RebalanceException) { 213 log.info("Rebalance"); 214 // the current batch is rollback because of this exception 215 // we continue with the new tailer assignment 216 } else if (execution.canRetryOn(t)) { 217 setBatchRetryPolicy(); 218 tailer.toLastCommitted(); 219 } else { 220 throw t; 221 } 222 } 223 restoreBatchPolicy(); 224 } 225 return end; 226 } 227 228 protected void setBatchRetryPolicy() { 229 currentBatchPolicy = BatchPolicy.NO_BATCH; 230 } 231 232 protected void restoreBatchPolicy() { 233 currentBatchPolicy = policy.getBatchPolicy(); 234 } 235 236 protected boolean processBatch() throws InterruptedException { 237 boolean end = false; 238 beginBatch(); 239 try { 240 BatchState state = acceptBatch(); 241 commitBatch(state); 242 if (state.getState() == BatchState.State.LAST) { 243 log.info("No more message on tailer: " + tailer); 244 end = true; 245 } 246 } catch (Exception e) { 247 try { 248 rollbackBatch(e); 249 } catch (Exception rollbackException) { 250 log.error("Exception on rollback invocation", rollbackException); 251 // we propagate the initial error. 252 } 253 throw e; 254 } 255 return end; 256 } 257 258 protected void beginBatch() { 259 consumer.begin(); 260 } 261 262 protected void commitBatch(BatchState state) { 263 try (Timer.Context ignore = globalBatchCommitTimer.time()) { 264 consumer.commit(); 265 committedCounter += state.getSize(); 266 globalCommittedCounter.inc(state.getSize()); 267 batchCommitCounter += 1; 268 if (log.isDebugEnabled()) { 269 log.debug("Commit batch size: " + state.getSize() + ", total committed: " + committedCounter); 270 } 271 } 272 } 273 274 protected void rollbackBatch(Exception e) { 275 if (e instanceof RebalanceException) { 276 log.warn("Rollback current batch because of consumer rebalancing"); 277 } else { 278 log.warn("Rollback batch", e); 279 } 280 consumer.rollback(); 281 } 282 283 protected BatchState acceptBatch() throws InterruptedException { 284 BatchState batch = new BatchState(currentBatchPolicy); 285 batch.start(); 286 LogRecord<M> record; 287 M message; 288 while ((record = tailer.read(policy.getWaitMessageTimeout())) != null) { 289 addSalt(); // do this here so kafka subscription happens concurrently 290 message = record.message(); 291 if (message.poisonPill()) { 292 log.warn("Receive a poison pill: " + message); 293 batch.last(); 294 } else { 295 try (Timer.Context ignore = globalAcceptTimer.time()) { 296 setThreadName(message.getId()); 297 consumer.accept(message); 298 acceptCounter += 1; 299 } 300 batch.inc(); 301 if (message.forceBatch()) { 302 if (log.isDebugEnabled()) { 303 log.debug("Force end of batch: " + message); 304 } 305 batch.force(); 306 } 307 } 308 if (batch.getState() != BatchState.State.FILLING) { 309 return batch; 310 } 311 } 312 batch.last(); 313 log.info(String.format("No record after: %ds on %s, terminating", policy.getWaitMessageTimeout().getSeconds(), 314 consumerId)); 315 return batch; 316 } 317 318 protected void setThreadName(String message) { 319 String name = threadName + "-" + acceptCounter + "-" + message; 320 currentThread().setName(name); 321 } 322 323 @Override 324 public void onPartitionsRevoked(Collection<LogPartition> partitions) { 325 // log.info("Partitions revoked: " + partitions); 326 } 327 328 @Override 329 public void onPartitionsAssigned(Collection<LogPartition> partitions) { 330 consumerId = tailer.toString(); 331 setThreadName("rebalance-" + consumerId); 332 // log.error("Partitions assigned: " + consumerId); 333 // partitions are opened on last committed by default 334 } 335}