001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.consumer;
020
021import com.codahale.metrics.Counter;
022import com.codahale.metrics.MetricRegistry;
023import com.codahale.metrics.SharedMetricRegistries;
024import com.codahale.metrics.Timer;
025import net.jodah.failsafe.Execution;
026import net.openhft.chronicle.core.util.Time;
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
030import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues;
031
032import java.util.concurrent.Callable;
033import java.util.concurrent.ThreadLocalRandom;
034
035import static java.lang.Thread.currentThread;
036
037/**
038 * Read messages from a tailer and drive a consumer according to its policy.
039 *
040 * @since 9.1
041 */
042public class ConsumerRunner<M extends Message> implements Callable<ConsumerStatus> {
043    private static final Log log = LogFactory.getLog(ConsumerRunner.class);
044
045    // This is the registry name used by Nuxeo without adding a dependency nuxeo-runtime
046    public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService";
047
048    private final ConsumerFactory<M> factory;
049    private final ConsumerPolicy policy;
050    private final int queue;
051    private final MQueues.Tailer<M> tailer;
052    private BatchPolicy currentBatchPolicy;
053    private String threadName;
054    private Consumer<M> consumer;
055
056    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
057    protected final Timer acceptTimer;
058    protected final Counter committedCounter;
059    protected final Timer batchCommitTimer;
060    protected final Counter batchFailureCount;
061    protected final Counter consumersCount;
062
063
064    public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, MQueues.Tailer<M> tailer) {
065        this.factory = factory;
066        this.tailer = tailer;
067        this.currentBatchPolicy = policy.getBatchPolicy();
068        this.policy = policy;
069        queue = tailer.getQueue();
070        consumersCount = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "consumers"));
071        acceptTimer = newTimer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "accepted", String.valueOf(queue)));
072        committedCounter = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "committed", String.valueOf(queue)));
073        batchFailureCount = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "batchFailure", String.valueOf(queue)));
074        batchCommitTimer = newTimer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "batchCommit", String.valueOf(queue)));
075        log.debug("Consumer thread created tailing on queue: " + queue);
076    }
077
078    private Counter newCounter(String name) {
079        registry.remove(name);
080        return registry.counter(name);
081    }
082
083    private Timer newTimer(String name) {
084        registry.remove(name);
085        return registry.timer(name);
086    }
087
088    @Override
089    public ConsumerStatus call() throws Exception {
090        threadName = currentThread().getName();
091        consumersCount.inc();
092        long start = Time.currentTimeMillis();
093        setTailerPosition();
094        consumer = factory.createConsumer(queue);
095        try {
096            addSalt();
097            consumerLoop();
098        } finally {
099            consumer.close();
100            consumersCount.dec();
101        }
102        return new ConsumerStatus(queue, acceptTimer.getCount(), committedCounter.getCount(),
103                batchCommitTimer.getCount(), batchFailureCount.getCount(), start, Time.currentTimeMillis(), false);
104    }
105
106    private void addSalt() throws InterruptedException {
107        long randomDelay = ThreadLocalRandom.current().nextLong(policy.getBatchPolicy().getTimeThreshold().toMillis());
108        if (policy.isSalted()) {
109            Thread.sleep(randomDelay);
110        }
111    }
112
113    private void setTailerPosition() {
114        switch (policy.getStartOffset()) {
115            case BEGIN:
116                tailer.toStart();
117                break;
118            case END:
119                tailer.toEnd();
120                break;
121            default:
122                tailer.toLastCommitted();
123        }
124    }
125
126    private void consumerLoop() throws InterruptedException {
127        boolean end = false;
128        while (!end) {
129            Execution execution = new Execution(policy.getRetryPolicy());
130            end = processBatchWithRetry(execution);
131            if (execution.getLastFailure() != null) {
132                if (policy.continueOnFailure()) {
133                    log.error("Skip message on failure after applying the retry policy: ", execution.getLastFailure());
134                } else {
135                    log.error("Abort on Failure after applying the retry policy: ", execution.getLastFailure());
136                    end = true;
137                }
138            }
139        }
140    }
141
142    private boolean processBatchWithRetry(Execution execution) throws InterruptedException {
143        boolean end = false;
144        while (!execution.isComplete()) {
145            try {
146                end = processBatch();
147                execution.complete();
148                tailer.commit();
149            } catch (Throwable t) {
150                batchFailureCount.inc();
151                if (!execution.canRetryOn(t)) {
152                    if (t instanceof InterruptedException) {
153                        Thread.currentThread().interrupt();
154                    }
155                    throw t;
156                }
157                setBatchRetryPolicy();
158                tailer.toLastCommitted();
159            }
160            restoreBatchPolicy();
161        }
162        return end;
163    }
164
165    private void setBatchRetryPolicy() {
166        currentBatchPolicy = BatchPolicy.NO_BATCH;
167    }
168
169    private void restoreBatchPolicy() {
170        currentBatchPolicy = policy.getBatchPolicy();
171    }
172
173    private boolean processBatch() throws InterruptedException {
174        boolean end = false;
175        beginBatch();
176        try {
177            BatchState state = acceptBatch();
178            commitBatch(state);
179            if (state.getState() == BatchState.State.LAST) {
180                log.info(String.format("No more message on queue %02d", queue));
181                end = true;
182            }
183        } catch (Exception e) {
184            try {
185                rollbackBatch();
186            } catch (Exception rollbackException) {
187                log.error("Exception on rollback invocation", rollbackException);
188                // we propagate the initial error.
189            }
190            throw e;
191        }
192        return end;
193    }
194
195    private void beginBatch() {
196        consumer.begin();
197    }
198
199    private void commitBatch(BatchState state) {
200        try (Timer.Context ignore = batchCommitTimer.time()) {
201            consumer.commit();
202            committedCounter.inc(state.getSize());
203        }
204    }
205
206    private void rollbackBatch() {
207        log.warn("Rollback batch");
208        consumer.rollback();
209    }
210
211    private BatchState acceptBatch() throws InterruptedException {
212        BatchState batch = new BatchState(currentBatchPolicy);
213        batch.start();
214        M message;
215        while ((message = tailer.read(policy.getWaitMessageTimeout())) != null) {
216            if (message.poisonPill()) {
217                log.warn("Receivce a poison pill: " + message);
218                batch.last();
219            } else {
220                try (Timer.Context ignore = acceptTimer.time()) {
221                    setThreadName(message);
222                    consumer.accept(message);
223                }
224                batch.inc();
225                if (message.forceBatch()) {
226                    if (log.isDebugEnabled()) {
227                        log.debug("Force end of batch: " + message);
228                    }
229                    batch.force();
230                }
231            }
232            if (batch.getState() != BatchState.State.FILLING) {
233                return batch;
234            }
235        }
236        batch.last();
237        return batch;
238    }
239
240    private void setThreadName(M message) {
241        String name = threadName + "-" + acceptTimer.getCount();
242        if (message != null) {
243            name += "-" + message.getId();
244        } else {
245            name += "-null";
246        }
247        currentThread().setName(name);
248    }
249}