001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern.consumer.internals;
020
021import static java.lang.Thread.currentThread;
022
023import java.util.Collection;
024import java.util.List;
025import java.util.Set;
026import java.util.concurrent.Callable;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.stream.Collectors;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.lib.stream.log.LogManager;
033import org.nuxeo.lib.stream.log.LogPartition;
034import org.nuxeo.lib.stream.log.LogRecord;
035import org.nuxeo.lib.stream.log.LogTailer;
036import org.nuxeo.lib.stream.log.RebalanceException;
037import org.nuxeo.lib.stream.log.RebalanceListener;
038import org.nuxeo.lib.stream.pattern.Message;
039import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy;
040import org.nuxeo.lib.stream.pattern.consumer.Consumer;
041import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory;
042import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
043import org.nuxeo.lib.stream.pattern.consumer.ConsumerStatus;
044
045import com.codahale.metrics.Counter;
046import com.codahale.metrics.MetricRegistry;
047import com.codahale.metrics.SharedMetricRegistries;
048import com.codahale.metrics.Timer;
049
050import net.jodah.failsafe.Execution;
051
052/**
053 * Read messages from a tailer and drive a consumer according to its policy.
054 *
055 * @since 9.1
056 */
057public class ConsumerRunner<M extends Message> implements Callable<ConsumerStatus>, RebalanceListener {
058    private static final Log log = LogFactory.getLog(ConsumerRunner.class);
059
060    // This is the registry name used by Nuxeo without adding a dependency nuxeo-runtime
061    public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService";
062
063    protected final ConsumerFactory<M> factory;
064
065    protected final ConsumerPolicy policy;
066
067    protected final LogTailer<M> tailer;
068
069    protected String consumerId;
070
071    protected BatchPolicy currentBatchPolicy;
072
073    protected String threadName;
074
075    protected Consumer<M> consumer;
076
077    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
078
079    protected long acceptCounter;
080
081    protected long committedCounter;
082
083    protected long batchCommitCounter;
084
085    protected long batchFailureCounter;
086
087    protected boolean alreadySalted;
088
089    // Metrics global to all threads, having metrics per thread is way too much
090    protected Timer globalAcceptTimer;
091
092    protected Counter globalCommittedCounter;
093
094    protected Timer globalBatchCommitTimer;
095
096    protected Counter globalBatchFailureCounter;
097
098    protected Counter globalConsumersCounter;
099
100    public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager,
101            List<LogPartition> defaultAssignments) {
102        this.factory = factory;
103        this.currentBatchPolicy = policy.getBatchPolicy();
104        this.policy = policy;
105        this.tailer = createTailer(manager, defaultAssignments);
106        consumerId = tailer.toString();
107        globalConsumersCounter = registry.counter(MetricRegistry.name("nuxeo", "importer", "stream", "consumers"));
108        setTailerPosition(manager);
109        log.debug("Consumer thread created tailing on: " + consumerId);
110    }
111
112    protected LogTailer<M> createTailer(LogManager manager, List<LogPartition> defaultAssignments) {
113        LogTailer<M> tailer;
114        if (manager.supportSubscribe()) {
115            Set<String> names = defaultAssignments.stream().map(LogPartition::name).collect(Collectors.toSet());
116            tailer = manager.subscribe(policy.getName(), names, this);
117        } else {
118            tailer = manager.createTailer(policy.getName(), defaultAssignments);
119        }
120        return tailer;
121    }
122
123    @Override
124    public ConsumerStatus call() throws Exception {
125        threadName = currentThread().getName();
126        setMetrics();
127        globalConsumersCounter.inc();
128        long start = System.currentTimeMillis();
129        consumer = factory.createConsumer(consumerId);
130        try {
131            consumerLoop();
132        } finally {
133            consumer.close();
134            globalConsumersCounter.dec();
135            tailer.close();
136        }
137        return new ConsumerStatus(consumerId, acceptCounter, committedCounter, batchCommitCounter, batchFailureCounter,
138                start, System.currentTimeMillis(), false);
139    }
140
141    protected void setMetrics() {
142        globalAcceptTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "accepted"));
143        globalCommittedCounter = registry.counter(
144                MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "committed"));
145        globalBatchFailureCounter = registry.counter(
146                MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "batchFailure"));
147        globalBatchCommitTimer = registry.timer(
148                MetricRegistry.name("nuxeo", "importer", "stream", "consumer", "batchCommit"));
149    }
150
151    protected void addSalt() throws InterruptedException {
152        if (alreadySalted) {
153            return;
154        }
155        // this random delay prevent consumers to be too much synchronized
156        if (policy.isSalted()) {
157            long randomDelay = ThreadLocalRandom.current()
158                                                .nextLong(policy.getBatchPolicy().getTimeThreshold().toMillis());
159            Thread.sleep(randomDelay);
160        }
161        alreadySalted = true;
162    }
163
164    protected void setTailerPosition(LogManager manager) {
165        ConsumerPolicy.StartOffset seekPosition = policy.getStartOffset();
166        if (manager.supportSubscribe() && seekPosition != ConsumerPolicy.StartOffset.LAST_COMMITTED) {
167            throw new UnsupportedOperationException(
168                    "Tailer startOffset to " + seekPosition + " is not supported in subscribe mode");
169        }
170        switch (policy.getStartOffset()) {
171        case BEGIN:
172            tailer.toStart();
173            break;
174        case END:
175            tailer.toEnd();
176            break;
177        default:
178            tailer.toLastCommitted();
179        }
180    }
181
182    protected void consumerLoop() throws InterruptedException {
183        boolean end = false;
184        while (!end) {
185            Execution execution = new Execution(policy.getRetryPolicy());
186            end = processBatchWithRetry(execution);
187            if (execution.getLastFailure() != null) {
188                if (policy.continueOnFailure()) {
189                    log.error("Skip message on failure after applying the retry policy: ", execution.getLastFailure());
190                } else {
191                    log.error("Abort on Failure after applying the retry policy: ", execution.getLastFailure());
192                    end = true;
193                }
194            }
195        }
196    }
197
198    protected boolean processBatchWithRetry(Execution execution) throws InterruptedException {
199        boolean end = false;
200        while (!execution.isComplete()) {
201            try {
202                end = processBatch();
203                tailer.commit();
204                execution.complete();
205            } catch (Throwable t) {
206                globalBatchFailureCounter.inc();
207                batchFailureCounter += 1;
208                if (t instanceof InterruptedException) {
209                    Thread.currentThread().interrupt();
210                    throw t;
211                }
212                if (t instanceof RebalanceException) {
213                    log.info("Rebalance");
214                    // the current batch is rollback because of this exception
215                    // we continue with the new tailer assignment
216                } else if (execution.canRetryOn(t)) {
217                    setBatchRetryPolicy();
218                    tailer.toLastCommitted();
219                } else {
220                    throw t;
221                }
222            }
223            restoreBatchPolicy();
224        }
225        return end;
226    }
227
228    protected void setBatchRetryPolicy() {
229        currentBatchPolicy = BatchPolicy.NO_BATCH;
230    }
231
232    protected void restoreBatchPolicy() {
233        currentBatchPolicy = policy.getBatchPolicy();
234    }
235
236    protected boolean processBatch() throws InterruptedException {
237        boolean end = false;
238        beginBatch();
239        try {
240            BatchState state = acceptBatch();
241            commitBatch(state);
242            if (state.getState() == BatchState.State.LAST) {
243                log.info("No more message on tailer: " + tailer);
244                end = true;
245            }
246        } catch (Exception e) {
247            try {
248                rollbackBatch(e);
249            } catch (Exception rollbackException) {
250                log.error("Exception on rollback invocation", rollbackException);
251                // we propagate the initial error.
252            }
253            throw e;
254        }
255        return end;
256    }
257
258    protected void beginBatch() {
259        consumer.begin();
260    }
261
262    protected void commitBatch(BatchState state) {
263        try (Timer.Context ignore = globalBatchCommitTimer.time()) {
264            consumer.commit();
265            committedCounter += state.getSize();
266            globalCommittedCounter.inc(state.getSize());
267            batchCommitCounter += 1;
268            if (log.isDebugEnabled()) {
269                log.debug("Commit batch size: " + state.getSize() + ", total committed: " + committedCounter);
270            }
271        }
272    }
273
274    protected void rollbackBatch(Exception e) {
275        if (e instanceof RebalanceException) {
276            log.warn("Rollback current batch because of consumer rebalancing");
277        } else {
278            log.warn("Rollback batch", e);
279        }
280        consumer.rollback();
281    }
282
283    protected BatchState acceptBatch() throws InterruptedException {
284        BatchState batch = new BatchState(currentBatchPolicy);
285        batch.start();
286        LogRecord<M> record;
287        M message;
288        while ((record = tailer.read(policy.getWaitMessageTimeout())) != null) {
289            addSalt(); // do this here so kafka subscription happens concurrently
290            message = record.message();
291            if (message.poisonPill()) {
292                log.warn("Receive a poison pill: " + message);
293                batch.last();
294            } else {
295                try (Timer.Context ignore = globalAcceptTimer.time()) {
296                    setThreadName(message.getId());
297                    consumer.accept(message);
298                    acceptCounter += 1;
299                }
300                batch.inc();
301                if (message.forceBatch()) {
302                    if (log.isDebugEnabled()) {
303                        log.debug("Force end of batch: " + message);
304                    }
305                    batch.force();
306                }
307            }
308            if (batch.getState() != BatchState.State.FILLING) {
309                return batch;
310            }
311        }
312        batch.last();
313        log.info(String.format("No record after: %ds on %s, terminating", policy.getWaitMessageTimeout().getSeconds(),
314                consumerId));
315        return batch;
316    }
317
318    protected void setThreadName(String message) {
319        String name = threadName + "-" + acceptCounter + "-" + message;
320        currentThread().setName(name);
321    }
322
323    @Override
324    public void onPartitionsRevoked(Collection<LogPartition> partitions) {
325        // log.info("Partitions revoked: " + partitions);
326    }
327
328    @Override
329    public void onPartitionsAssigned(Collection<LogPartition> partitions) {
330        consumerId = tailer.toString();
331        setThreadName("rebalance-" + consumerId);
332        // log.error("Partitions assigned: " + consumerId);
333        // partitions are opened on last committed by default
334    }
335}