001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern.consumer.internals;
020
021import static java.lang.Thread.currentThread;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023
024import java.util.Collection;
025import java.util.List;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.stream.Collectors;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.lib.stream.codec.Codec;
034import org.nuxeo.lib.stream.log.LogManager;
035import org.nuxeo.lib.stream.log.LogPartition;
036import org.nuxeo.lib.stream.log.LogRecord;
037import org.nuxeo.lib.stream.log.LogTailer;
038import org.nuxeo.lib.stream.log.Name;
039import org.nuxeo.lib.stream.log.RebalanceException;
040import org.nuxeo.lib.stream.log.RebalanceListener;
041import org.nuxeo.lib.stream.pattern.Message;
042import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy;
043import org.nuxeo.lib.stream.pattern.consumer.Consumer;
044import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory;
045import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
046import org.nuxeo.lib.stream.pattern.consumer.ConsumerStatus;
047
048import io.dropwizard.metrics5.Counter;
049import io.dropwizard.metrics5.MetricRegistry;
050import io.dropwizard.metrics5.SharedMetricRegistries;
051import io.dropwizard.metrics5.Timer;
052
053import net.jodah.failsafe.Execution;
054
055/**
056 * Read messages from a tailer and drive a consumer according to its policy.
057 *
058 * @since 9.1
059 */
060public class ConsumerRunner<M extends Message> implements Callable<ConsumerStatus>, RebalanceListener {
061    private static final Log log = LogFactory.getLog(ConsumerRunner.class);
062
063    // This is the registry name used by Nuxeo without adding a dependency nuxeo-runtime
064    public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService";
065
066    protected final ConsumerFactory<M> factory;
067
068    protected final ConsumerPolicy policy;
069
070    protected final LogTailer<M> tailer;
071
072    protected String consumerId;
073
074    protected BatchPolicy currentBatchPolicy;
075
076    protected String threadName;
077
078    protected Consumer<M> consumer;
079
080    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
081
082    protected long acceptCounter;
083
084    protected long committedCounter;
085
086    protected long batchCommitCounter;
087
088    protected long batchFailureCounter;
089
090    protected boolean alreadySalted;
091
092    // Metrics global to all threads, having metrics per thread is way too much
093    protected Timer globalAcceptTimer;
094
095    protected Counter globalCommittedCounter;
096
097    protected Timer globalBatchCommitTimer;
098
099    protected Counter globalBatchFailureCounter;
100
101    protected Counter globalConsumersCounter;
102
103    /**
104     * @deprecated since 11.1, due to serialization issue with java 11, use
105     *             {@link #ConsumerRunner(ConsumerFactory, ConsumerPolicy, LogManager, Codec, List)} which allows to
106     *             give a {@link org.nuxeo.lib.stream.codec.Codec codec} to {@link org.nuxeo.lib.stream.log.LogTailer
107     *             tailer}.
108     */
109    @Deprecated
110    @SuppressWarnings("unchecked")
111    public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager,
112            List<LogPartition> defaultAssignments) {
113        this(factory, policy, manager, NO_CODEC, defaultAssignments);
114    }
115
116    public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager, Codec<M> codec,
117            List<LogPartition> defaultAssignments) {
118        this.factory = factory;
119        this.currentBatchPolicy = policy.getBatchPolicy();
120        this.policy = policy;
121        this.tailer = createTailer(manager, codec, defaultAssignments);
122        consumerId = tailer.toString();
123        globalConsumersCounter = registry.counter(MetricRegistry.name("nuxeo", "importer", "stream", "consumers"));
124        setTailerPosition(manager);
125        log.debug("Consumer thread created tailing on: " + consumerId);
126    }
127
128    protected LogTailer<M> createTailer(LogManager manager, Codec<M> codec, List<LogPartition> defaultAssignments) {
129        LogTailer<M> tailer;
130        if (manager.supportSubscribe()) {
131            Set<Name> names = defaultAssignments.stream().map(LogPartition::name).collect(Collectors.toSet());
132            tailer = manager.subscribe(Name.ofUrn(policy.getName()), names, this, codec);
133        } else {
134            tailer = manager.createTailer(Name.ofUrn(policy.getName()), defaultAssignments, codec);
135        }
136        return tailer;
137    }
138
139    @Override
140    public ConsumerStatus call() throws Exception {
141        threadName = currentThread().getName();
142        setMetrics();
143        globalConsumersCounter.inc();
144        long start = System.currentTimeMillis();
145        consumer = factory.createConsumer(consumerId);
146        try {
147            consumerLoop();
148        } finally {
149            consumer.close();
150            globalConsumersCounter.dec();
151            tailer.close();
152        }
153        return new ConsumerStatus(consumerId, acceptCounter, committedCounter, batchCommitCounter, batchFailureCounter,
154                start, System.currentTimeMillis(), false);
155    }
156
157    protected void setMetrics() {
158        globalAcceptTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "accepted"));
159        globalCommittedCounter = registry.counter(
160                MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "committed"));
161        globalBatchFailureCounter = registry.counter(
162                MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "batchFailure"));
163        globalBatchCommitTimer = registry.timer(
164                MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "batchCommit"));
165    }
166
167    protected void addSalt() throws InterruptedException {
168        if (alreadySalted) {
169            return;
170        }
171        // this random delay prevent consumers to be too much synchronized
172        if (policy.isSalted()) {
173            long randomDelay = ThreadLocalRandom.current()
174                                                .nextLong(policy.getBatchPolicy().getTimeThreshold().toMillis());
175            Thread.sleep(randomDelay);
176        }
177        alreadySalted = true;
178    }
179
180    protected void setTailerPosition(LogManager manager) {
181        ConsumerPolicy.StartOffset seekPosition = policy.getStartOffset();
182        if (manager.supportSubscribe() && seekPosition != ConsumerPolicy.StartOffset.LAST_COMMITTED) {
183            throw new UnsupportedOperationException(
184                    "Tailer startOffset to " + seekPosition + " is not supported in subscribe mode");
185        }
186        switch (policy.getStartOffset()) {
187        case BEGIN:
188            tailer.toStart();
189            break;
190        case END:
191            tailer.toEnd();
192            break;
193        default:
194            tailer.toLastCommitted();
195        }
196    }
197
198    protected void consumerLoop() throws InterruptedException {
199        boolean end = false;
200        while (!end) {
201            Execution execution = new Execution(policy.getRetryPolicy());
202            end = processBatchWithRetry(execution);
203            if (execution.getLastFailure() != null) {
204                if (policy.continueOnFailure()) {
205                    log.error("Skip message on failure after applying the retry policy: ", execution.getLastFailure());
206                } else {
207                    log.error("Abort on Failure after applying the retry policy: ", execution.getLastFailure());
208                    end = true;
209                }
210            }
211        }
212    }
213
214    protected boolean processBatchWithRetry(Execution execution) throws InterruptedException {
215        boolean end = false;
216        while (!execution.isComplete()) {
217            try {
218                end = processBatch();
219                tailer.commit();
220                execution.complete();
221            } catch (Throwable t) {
222                globalBatchFailureCounter.inc();
223                batchFailureCounter += 1;
224                if (t instanceof InterruptedException) {
225                    Thread.currentThread().interrupt();
226                    throw t;
227                }
228                if (t instanceof RebalanceException) {
229                    log.info("Rebalance");
230                    // the current batch is rollback because of this exception
231                    // we continue with the new tailer assignment
232                } else if (execution.canRetryOn(t)) {
233                    setBatchRetryPolicy();
234                    tailer.toLastCommitted();
235                } else {
236                    throw t;
237                }
238            }
239            restoreBatchPolicy();
240        }
241        return end;
242    }
243
244    protected void setBatchRetryPolicy() {
245        currentBatchPolicy = BatchPolicy.NO_BATCH;
246    }
247
248    protected void restoreBatchPolicy() {
249        currentBatchPolicy = policy.getBatchPolicy();
250    }
251
252    protected boolean processBatch() throws InterruptedException {
253        boolean end = false;
254        beginBatch();
255        try {
256            BatchState state = acceptBatch();
257            commitBatch(state);
258            if (state.getState() == BatchState.State.LAST) {
259                log.info("No more message on tailer: " + tailer);
260                end = true;
261            }
262        } catch (Exception e) {
263            try {
264                rollbackBatch(e);
265            } catch (Exception rollbackException) {
266                log.error("Exception on rollback invocation", rollbackException);
267                // we propagate the initial error.
268            }
269            throw e;
270        }
271        return end;
272    }
273
274    protected void beginBatch() {
275        consumer.begin();
276    }
277
278    protected void commitBatch(BatchState state) {
279        try (Timer.Context ignore = globalBatchCommitTimer.time()) {
280            consumer.commit();
281            committedCounter += state.getSize();
282            globalCommittedCounter.inc(state.getSize());
283            batchCommitCounter += 1;
284            if (log.isDebugEnabled()) {
285                log.debug("Commit batch size: " + state.getSize() + ", total committed: " + committedCounter);
286            }
287        }
288    }
289
290    protected void rollbackBatch(Exception e) {
291        if (e instanceof RebalanceException) {
292            log.warn("Rollback current batch because of consumer rebalancing");
293        } else {
294            log.warn("Rollback batch", e);
295        }
296        consumer.rollback();
297    }
298
299    protected BatchState acceptBatch() throws InterruptedException {
300        BatchState batch = new BatchState(currentBatchPolicy);
301        batch.start();
302        LogRecord<M> record;
303        M message;
304        while ((record = tailer.read(policy.getWaitMessageTimeout())) != null) {
305            addSalt(); // do this here so kafka subscription happens concurrently
306            message = record.message();
307            if (message.poisonPill()) {
308                log.warn("Receive a poison pill: " + message);
309                batch.last();
310            } else {
311                try (Timer.Context ignore = globalAcceptTimer.time()) {
312                    setThreadName(message.getId());
313                    consumer.accept(message);
314                    acceptCounter += 1;
315                }
316                batch.inc();
317                if (message.forceBatch()) {
318                    if (log.isDebugEnabled()) {
319                        log.debug("Force end of batch: " + message);
320                    }
321                    batch.force();
322                }
323            }
324            if (batch.getState() != BatchState.State.FILLING) {
325                return batch;
326            }
327        }
328        batch.last();
329        log.info(String.format("No record after: %ds on %s, terminating", policy.getWaitMessageTimeout().getSeconds(),
330                consumerId));
331        return batch;
332    }
333
334    protected void setThreadName(String message) {
335        String name = threadName + "-" + acceptCounter + "-" + message;
336        currentThread().setName(name);
337    }
338
339    @Override
340    public void onPartitionsRevoked(Collection<LogPartition> partitions) {
341        setThreadName("rebalance-revoked");
342    }
343
344    @Override
345    public void onPartitionsLost(Collection<LogPartition> partitions) {
346        setThreadName("rebalance-LOST");
347    }
348
349    @Override
350    public void onPartitionsAssigned(Collection<LogPartition> partitions) {
351        consumerId = tailer.toString();
352        setThreadName("rebalance-assigned-" + tailer.toString());
353    }
354}