001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals;
020
021import com.codahale.metrics.Counter;
022import com.codahale.metrics.MetricRegistry;
023import com.codahale.metrics.SharedMetricRegistries;
024import com.codahale.metrics.Timer;
025import net.jodah.failsafe.Execution;
026import net.openhft.chronicle.core.util.Time;
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
030import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
031import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceException;
032import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener;
033import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord;
034import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
035import org.nuxeo.ecm.platform.importer.mqueues.pattern.Message;
036import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.BatchPolicy;
037import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.Consumer;
038import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerFactory;
039import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerPolicy;
040import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerStatus;
041
042import java.util.Collection;
043import java.util.List;
044import java.util.Set;
045import java.util.concurrent.Callable;
046import java.util.concurrent.ThreadLocalRandom;
047import java.util.stream.Collectors;
048
049import static java.lang.Thread.currentThread;
050import static org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerPolicy.StartOffset.LAST_COMMITTED;
051
052/**
053 * Read messages from a tailer and drive a consumer according to its policy.
054 *
055 * @since 9.1
056 */
057public class ConsumerRunner<M extends Message> implements Callable<ConsumerStatus>, MQRebalanceListener {
058    private static final Log log = LogFactory.getLog(ConsumerRunner.class);
059
060    // This is the registry name used by Nuxeo without adding a dependency nuxeo-runtime
061    public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService";
062
063    private final ConsumerFactory<M> factory;
064    private final ConsumerPolicy policy;
065    private final MQTailer<M> tailer;
066    private String consumerId;
067    private BatchPolicy currentBatchPolicy;
068    private String threadName;
069    private Consumer<M> consumer;
070    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
071    protected Timer acceptTimer;
072    protected Counter committedCounter;
073    protected Timer batchCommitTimer;
074    protected Counter batchFailureCount;
075    protected Counter consumersCount;
076    private boolean alreadySalted = false;
077
078
079    public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, MQManager<M> manager,
080                          List<MQPartition> defaultAssignments) {
081        this.factory = factory;
082        this.currentBatchPolicy = policy.getBatchPolicy();
083        this.policy = policy;
084        this.tailer = createTailer(manager, defaultAssignments);
085        consumerId = tailer.toString();
086        consumersCount = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "consumers"));
087        setTailerPosition(manager);
088        log.debug("Consumer thread created tailing on: " + consumerId);
089    }
090
091    private MQTailer<M> createTailer(MQManager<M> manager, List<MQPartition> defaultAssignments) {
092        MQTailer<M> tailer;
093        if (manager.supportSubscribe()) {
094            Set<String> names = defaultAssignments.stream().map(MQPartition::name).collect(Collectors.toSet());
095            tailer = manager.subscribe(policy.getName(), names, this);
096        } else {
097            tailer = manager.createTailer(policy.getName(), defaultAssignments);
098        }
099        return tailer;
100    }
101
102    private Counter newCounter(String name) {
103        registry.remove(name);
104        return registry.counter(name);
105    }
106
107    private Timer newTimer(String name) {
108        registry.remove(name);
109        return registry.timer(name);
110    }
111
112    @Override
113    public ConsumerStatus call() throws Exception {
114        threadName = currentThread().getName();
115        setMetrics(threadName);
116        consumersCount.inc();
117        long start = Time.currentTimeMillis();
118        consumer = factory.createConsumer(consumerId);
119        try {
120            consumerLoop();
121        } finally {
122            consumer.close();
123            consumersCount.dec();
124        }
125        return new ConsumerStatus(consumerId, acceptTimer.getCount(), committedCounter.getCount(),
126                batchCommitTimer.getCount(), batchFailureCount.getCount(), start, Time.currentTimeMillis(), false);
127    }
128
129    private void setMetrics(String name) {
130        acceptTimer = newTimer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "accepted", name));
131        committedCounter = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "committed", name));
132        batchFailureCount = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "batchFailure", name));
133        batchCommitTimer = newTimer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "batchCommit", name));
134    }
135
136    private void addSalt() throws InterruptedException {
137        if (alreadySalted) {
138            return;
139        }
140        // this random delay prevent consumers to be too much synchronized
141        if (policy.isSalted()) {
142            long randomDelay = ThreadLocalRandom.current().nextLong(policy.getBatchPolicy().getTimeThreshold().toMillis());
143            Thread.sleep(randomDelay);
144        }
145        alreadySalted = true;
146    }
147
148    private void setTailerPosition(MQManager<M> manager) {
149        ConsumerPolicy.StartOffset seekPosition = policy.getStartOffset();
150        if (manager.supportSubscribe() && seekPosition != LAST_COMMITTED) {
151            throw new UnsupportedOperationException("Tailer startOffset to " + seekPosition + " is not supported in subscribe mode");
152        }
153        switch (policy.getStartOffset()) {
154            case BEGIN:
155                tailer.toStart();
156                break;
157            case END:
158                tailer.toEnd();
159                break;
160            default:
161                tailer.toLastCommitted();
162        }
163    }
164
165    private void consumerLoop() throws InterruptedException {
166        boolean end = false;
167        while (!end) {
168            Execution execution = new Execution(policy.getRetryPolicy());
169            end = processBatchWithRetry(execution);
170            if (execution.getLastFailure() != null) {
171                if (policy.continueOnFailure()) {
172                    log.error("Skip message on failure after applying the retry policy: ", execution.getLastFailure());
173                } else {
174                    log.error("Abort on Failure after applying the retry policy: ", execution.getLastFailure());
175                    end = true;
176                }
177            }
178        }
179    }
180
181    private boolean processBatchWithRetry(Execution execution) throws InterruptedException {
182        boolean end = false;
183        while (!execution.isComplete()) {
184            try {
185                end = processBatch();
186                tailer.commit();
187                execution.complete();
188            } catch (Throwable t) {
189                batchFailureCount.inc();
190                if (t instanceof InterruptedException) {
191                    Thread.currentThread().interrupt();
192                    throw t;
193                }
194                if (t instanceof MQRebalanceException) {
195                    log.info("Rebalance");
196                    // the current batch is rollback because of this exception
197                    // we continue with the new tailer assignment
198                } else if (execution.canRetryOn(t)) {
199                    setBatchRetryPolicy();
200                    tailer.toLastCommitted();
201                } else {
202                    throw t;
203                }
204            }
205            restoreBatchPolicy();
206        }
207        return end;
208    }
209
210    private void setBatchRetryPolicy() {
211        currentBatchPolicy = BatchPolicy.NO_BATCH;
212    }
213
214    private void restoreBatchPolicy() {
215        currentBatchPolicy = policy.getBatchPolicy();
216    }
217
218    private boolean processBatch() throws InterruptedException {
219        boolean end = false;
220        beginBatch();
221        try {
222            BatchState state = acceptBatch();
223            commitBatch(state);
224            if (state.getState() == BatchState.State.LAST) {
225                log.info("No more message on tailer: " + tailer);
226                end = true;
227            }
228        } catch (Exception e) {
229            try {
230                rollbackBatch();
231            } catch (Exception rollbackException) {
232                log.error("Exception on rollback invocation", rollbackException);
233                // we propagate the initial error.
234            }
235            throw e;
236        }
237        return end;
238    }
239
240    private void beginBatch() {
241        consumer.begin();
242    }
243
244    private void commitBatch(BatchState state) {
245        try (Timer.Context ignore = batchCommitTimer.time()) {
246            consumer.commit();
247            committedCounter.inc(state.getSize());
248            if (log.isDebugEnabled()) {
249                log.debug("Commit batch size: " + state.getSize() +
250                        ", total committed: " + committedCounter.getCount());
251            }
252        }
253    }
254
255    private void rollbackBatch() {
256        log.warn("Rollback batch");
257        consumer.rollback();
258    }
259
260    private BatchState acceptBatch() throws InterruptedException {
261        BatchState batch = new BatchState(currentBatchPolicy);
262        batch.start();
263        MQRecord<M> record;
264        M message;
265        while ((record = tailer.read(policy.getWaitMessageTimeout())) != null) {
266            // addSalt(); // do this here so kafka subscription happens concurrently
267            message = record.message();
268            if (message.poisonPill()) {
269                log.warn("Receive a poison pill: " + message);
270                batch.last();
271            } else {
272                try (Timer.Context ignore = acceptTimer.time()) {
273                    setThreadName(message);
274                    consumer.accept(message);
275                }
276                batch.inc();
277                if (message.forceBatch()) {
278                    if (log.isDebugEnabled()) {
279                        log.debug("Force end of batch: " + message);
280                    }
281                    batch.force();
282                }
283            }
284            if (batch.getState() != BatchState.State.FILLING) {
285                return batch;
286            }
287        }
288        batch.last();
289        return batch;
290    }
291
292    private void setThreadName(M message) {
293        String name = threadName + "-" + acceptTimer.getCount();
294        if (message != null) {
295            name += "-" + message.getId();
296        } else {
297            name += "-null";
298        }
299        currentThread().setName(name);
300    }
301
302    @Override
303    public void onPartitionsRevoked(Collection<MQPartition> partitions) {
304        // log.info("Partitions revoked: " + partitions);
305    }
306
307    @Override
308    public void onPartitionsAssigned(Collection<MQPartition> partitions) {
309        consumerId = tailer.toString();
310        // log.error("Partitions assigned: " + consumerId);
311        // partitions are opened on last committed by default
312    }
313}