001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals; 020 021import com.codahale.metrics.Counter; 022import com.codahale.metrics.MetricRegistry; 023import com.codahale.metrics.SharedMetricRegistries; 024import com.codahale.metrics.Timer; 025import net.jodah.failsafe.Execution; 026import net.openhft.chronicle.core.util.Time; 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager; 030import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 031import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceException; 032import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener; 033import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord; 034import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer; 035import org.nuxeo.ecm.platform.importer.mqueues.pattern.Message; 036import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.BatchPolicy; 037import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.Consumer; 038import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerFactory; 039import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerPolicy; 040import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerStatus; 041 042import java.util.Collection; 043import java.util.List; 044import java.util.Set; 045import java.util.concurrent.Callable; 046import java.util.concurrent.ThreadLocalRandom; 047import java.util.stream.Collectors; 048 049import static java.lang.Thread.currentThread; 050import static org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerPolicy.StartOffset.LAST_COMMITTED; 051 052/** 053 * Read messages from a tailer and drive a consumer according to its policy. 054 * 055 * @since 9.1 056 */ 057public class ConsumerRunner<M extends Message> implements Callable<ConsumerStatus>, MQRebalanceListener { 058 private static final Log log = LogFactory.getLog(ConsumerRunner.class); 059 060 // This is the registry name used by Nuxeo without adding a dependency nuxeo-runtime 061 public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService"; 062 063 private final ConsumerFactory<M> factory; 064 private final ConsumerPolicy policy; 065 private final MQTailer<M> tailer; 066 private String consumerId; 067 private BatchPolicy currentBatchPolicy; 068 private String threadName; 069 private Consumer<M> consumer; 070 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME); 071 protected Timer acceptTimer; 072 protected Counter committedCounter; 073 protected Timer batchCommitTimer; 074 protected Counter batchFailureCount; 075 protected Counter consumersCount; 076 private boolean alreadySalted = false; 077 078 079 public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, MQManager<M> manager, 080 List<MQPartition> defaultAssignments) { 081 this.factory = factory; 082 this.currentBatchPolicy = policy.getBatchPolicy(); 083 this.policy = policy; 084 this.tailer = createTailer(manager, defaultAssignments); 085 consumerId = tailer.toString(); 086 consumersCount = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "consumers")); 087 setTailerPosition(manager); 088 log.debug("Consumer thread created tailing on: " + consumerId); 089 } 090 091 private MQTailer<M> createTailer(MQManager<M> manager, List<MQPartition> defaultAssignments) { 092 MQTailer<M> tailer; 093 if (manager.supportSubscribe()) { 094 Set<String> names = defaultAssignments.stream().map(MQPartition::name).collect(Collectors.toSet()); 095 tailer = manager.subscribe(policy.getName(), names, this); 096 } else { 097 tailer = manager.createTailer(policy.getName(), defaultAssignments); 098 } 099 return tailer; 100 } 101 102 private Counter newCounter(String name) { 103 registry.remove(name); 104 return registry.counter(name); 105 } 106 107 private Timer newTimer(String name) { 108 registry.remove(name); 109 return registry.timer(name); 110 } 111 112 @Override 113 public ConsumerStatus call() throws Exception { 114 threadName = currentThread().getName(); 115 setMetrics(threadName); 116 consumersCount.inc(); 117 long start = Time.currentTimeMillis(); 118 consumer = factory.createConsumer(consumerId); 119 try { 120 consumerLoop(); 121 } finally { 122 consumer.close(); 123 consumersCount.dec(); 124 } 125 return new ConsumerStatus(consumerId, acceptTimer.getCount(), committedCounter.getCount(), 126 batchCommitTimer.getCount(), batchFailureCount.getCount(), start, Time.currentTimeMillis(), false); 127 } 128 129 private void setMetrics(String name) { 130 acceptTimer = newTimer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "accepted", name)); 131 committedCounter = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "committed", name)); 132 batchFailureCount = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "batchFailure", name)); 133 batchCommitTimer = newTimer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "batchCommit", name)); 134 } 135 136 private void addSalt() throws InterruptedException { 137 if (alreadySalted) { 138 return; 139 } 140 // this random delay prevent consumers to be too much synchronized 141 if (policy.isSalted()) { 142 long randomDelay = ThreadLocalRandom.current().nextLong(policy.getBatchPolicy().getTimeThreshold().toMillis()); 143 Thread.sleep(randomDelay); 144 } 145 alreadySalted = true; 146 } 147 148 private void setTailerPosition(MQManager<M> manager) { 149 ConsumerPolicy.StartOffset seekPosition = policy.getStartOffset(); 150 if (manager.supportSubscribe() && seekPosition != LAST_COMMITTED) { 151 throw new UnsupportedOperationException("Tailer startOffset to " + seekPosition + " is not supported in subscribe mode"); 152 } 153 switch (policy.getStartOffset()) { 154 case BEGIN: 155 tailer.toStart(); 156 break; 157 case END: 158 tailer.toEnd(); 159 break; 160 default: 161 tailer.toLastCommitted(); 162 } 163 } 164 165 private void consumerLoop() throws InterruptedException { 166 boolean end = false; 167 while (!end) { 168 Execution execution = new Execution(policy.getRetryPolicy()); 169 end = processBatchWithRetry(execution); 170 if (execution.getLastFailure() != null) { 171 if (policy.continueOnFailure()) { 172 log.error("Skip message on failure after applying the retry policy: ", execution.getLastFailure()); 173 } else { 174 log.error("Abort on Failure after applying the retry policy: ", execution.getLastFailure()); 175 end = true; 176 } 177 } 178 } 179 } 180 181 private boolean processBatchWithRetry(Execution execution) throws InterruptedException { 182 boolean end = false; 183 while (!execution.isComplete()) { 184 try { 185 end = processBatch(); 186 tailer.commit(); 187 execution.complete(); 188 } catch (Throwable t) { 189 batchFailureCount.inc(); 190 if (t instanceof InterruptedException) { 191 Thread.currentThread().interrupt(); 192 throw t; 193 } 194 if (t instanceof MQRebalanceException) { 195 log.info("Rebalance"); 196 // the current batch is rollback because of this exception 197 // we continue with the new tailer assignment 198 } else if (execution.canRetryOn(t)) { 199 setBatchRetryPolicy(); 200 tailer.toLastCommitted(); 201 } else { 202 throw t; 203 } 204 } 205 restoreBatchPolicy(); 206 } 207 return end; 208 } 209 210 private void setBatchRetryPolicy() { 211 currentBatchPolicy = BatchPolicy.NO_BATCH; 212 } 213 214 private void restoreBatchPolicy() { 215 currentBatchPolicy = policy.getBatchPolicy(); 216 } 217 218 private boolean processBatch() throws InterruptedException { 219 boolean end = false; 220 beginBatch(); 221 try { 222 BatchState state = acceptBatch(); 223 commitBatch(state); 224 if (state.getState() == BatchState.State.LAST) { 225 log.info("No more message on tailer: " + tailer); 226 end = true; 227 } 228 } catch (Exception e) { 229 try { 230 rollbackBatch(); 231 } catch (Exception rollbackException) { 232 log.error("Exception on rollback invocation", rollbackException); 233 // we propagate the initial error. 234 } 235 throw e; 236 } 237 return end; 238 } 239 240 private void beginBatch() { 241 consumer.begin(); 242 } 243 244 private void commitBatch(BatchState state) { 245 try (Timer.Context ignore = batchCommitTimer.time()) { 246 consumer.commit(); 247 committedCounter.inc(state.getSize()); 248 if (log.isDebugEnabled()) { 249 log.debug("Commit batch size: " + state.getSize() + 250 ", total committed: " + committedCounter.getCount()); 251 } 252 } 253 } 254 255 private void rollbackBatch() { 256 log.warn("Rollback batch"); 257 consumer.rollback(); 258 } 259 260 private BatchState acceptBatch() throws InterruptedException { 261 BatchState batch = new BatchState(currentBatchPolicy); 262 batch.start(); 263 MQRecord<M> record; 264 M message; 265 while ((record = tailer.read(policy.getWaitMessageTimeout())) != null) { 266 // addSalt(); // do this here so kafka subscription happens concurrently 267 message = record.message(); 268 if (message.poisonPill()) { 269 log.warn("Receive a poison pill: " + message); 270 batch.last(); 271 } else { 272 try (Timer.Context ignore = acceptTimer.time()) { 273 setThreadName(message); 274 consumer.accept(message); 275 } 276 batch.inc(); 277 if (message.forceBatch()) { 278 if (log.isDebugEnabled()) { 279 log.debug("Force end of batch: " + message); 280 } 281 batch.force(); 282 } 283 } 284 if (batch.getState() != BatchState.State.FILLING) { 285 return batch; 286 } 287 } 288 batch.last(); 289 return batch; 290 } 291 292 private void setThreadName(M message) { 293 String name = threadName + "-" + acceptTimer.getCount(); 294 if (message != null) { 295 name += "-" + message.getId(); 296 } else { 297 name += "-null"; 298 } 299 currentThread().setName(name); 300 } 301 302 @Override 303 public void onPartitionsRevoked(Collection<MQPartition> partitions) { 304 // log.info("Partitions revoked: " + partitions); 305 } 306 307 @Override 308 public void onPartitionsAssigned(Collection<MQPartition> partitions) { 309 consumerId = tailer.toString(); 310 // log.error("Partitions assigned: " + consumerId); 311 // partitions are opened on last committed by default 312 } 313}