001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 */
017package org.nuxeo.ecm.platform.importer.queue.consumer;
018
019import com.codahale.metrics.Counter;
020import com.codahale.metrics.MetricRegistry;
021import com.codahale.metrics.SharedMetricRegistries;
022import com.codahale.metrics.Timer;
023
024import org.nuxeo.common.utils.ExceptionUtils;
025import org.nuxeo.ecm.core.api.CoreSession;
026import org.nuxeo.ecm.core.api.DocumentModel;
027import org.nuxeo.ecm.core.api.DocumentRef;
028import org.nuxeo.ecm.core.api.UnrestrictedSessionRunner;
029import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
030import org.nuxeo.ecm.platform.importer.queue.AbstractTaskRunner;
031import org.nuxeo.ecm.platform.importer.queue.manager.QueuesManager;
032import org.nuxeo.ecm.platform.importer.source.SourceNode;
033import org.nuxeo.runtime.metrics.MetricsService;
034import org.nuxeo.runtime.transaction.TransactionHelper;
035
036import java.util.concurrent.TimeUnit;
037
038import static java.lang.Thread.currentThread;
039import static org.nuxeo.runtime.transaction.TransactionHelper.startTransaction;
040
041/**
042 * @since 8.3
043 */
044public abstract class AbstractConsumer extends AbstractTaskRunner implements Consumer {
045
046    protected final Batch batch;
047
048    protected final String repositoryName;
049
050    protected final QueuesManager queuesManager;
051
052    protected final int queue;
053
054    protected final DocumentRef rootRef;
055
056    protected long startTime = 0;
057
058    protected long lastCheckTime = 0;
059
060    protected long lastCount = 0;
061
062    protected static final long CHECK_INTERVAL = 2000;
063
064    protected double lastImediatThroughput = 0;
065
066    protected String originatingUsername;
067
068    protected ImporterLogger log = null;
069
070    protected boolean replayMode = true;
071
072    protected String threadName;
073
074    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
075
076    protected final Timer processTimer;
077
078    protected final Timer commitTimer;
079
080    protected final Counter retryCount;
081
082    protected final Counter failCount;
083
084    protected final Counter consumerCount;
085
086    protected long lastCommitTime;
087
088    protected static final long TRANSACTION_THRESOLD_MS = 3 * 60 * 1000; // 3 min
089
090    public AbstractConsumer(ImporterLogger log, DocumentModel root, int batchSize, QueuesManager queuesManager, int queue) {
091        this.log = log;
092        repositoryName = root.getRepositoryName();
093        this.batch = new Batch(batchSize);
094        this.queuesManager = queuesManager;
095        this.queue = queue;
096        rootRef = root.getRef();
097
098        lastCommitTime = System.currentTimeMillis();
099        processTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "import"));
100        commitTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "commit"));
101        retryCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "retry"));
102        failCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "fail"));
103        consumerCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer"));
104
105        log.info("Create consumer root:" + root.getPathAsString() + " batchSize: " + batchSize);
106    }
107
108    @Override
109    public void run() {
110        threadName = currentThread().getName();
111        started = true;
112        startTime = System.currentTimeMillis();
113        lastCheckTime = startTime;
114        consumerCount.inc();
115        try {
116            runImport();
117        } catch (Exception e) {
118            log.error("Unexpected End of consumer after " + getNbProcessed() + " nodes.", e);
119            ExceptionUtils.checkInterrupt(e);
120            runDrainer();
121            error = e;
122        } finally {
123            completed = true;
124            started = false;
125            consumerCount.dec();
126        }
127    }
128
129    protected void runDrainer() {
130        // consumer is broken but we drain the queue to prevent blocking the producer
131        markThreadName("draining");
132        log.error("Consumer is broken, draining the queue to rejected");
133        do {
134            try {
135                SourceNode src = queuesManager.poll(queue, 1, TimeUnit.SECONDS);
136                if (src == null && canStop) {
137                    log.info("End of broken consumer, processed node: " + getNbProcessed());
138                    break;
139                } else if (src != null) {
140                    log.error("Consumer is broken reject node: " + src.getName());
141                    onSourceNodeException(src, error);
142                }
143            } catch (InterruptedException e) {
144                log.error("Interrupted exception received, stopping consumer");
145                break;
146            }
147        } while(true);
148    }
149
150    private void markThreadName(String mark) {
151        Thread.currentThread().setName(Thread.currentThread().getName() + "-" + mark);
152    }
153
154    protected void runImport() {
155
156        UnrestrictedSessionRunner runner = new UnrestrictedSessionRunner(repositoryName, originatingUsername) {
157            @Override
158            public void run() {
159                log.info("Consumer running");
160                SourceNode src;
161                while (true) {
162                    try {
163                        src = queuesManager.poll(queue, 1, TimeUnit.SECONDS);
164                    } catch (InterruptedException e) {
165                        log.error("Interrupted exception received, stopping consumer");
166                        break;
167                    }
168                    if (src == null) {
169                        log.debug("Poll timeout, queue size:" + queuesManager.size(queue));
170                        if (canStop) {
171                            log.info("End of consumer, processed node: " + getNbProcessed());
172                            break;
173                        }
174                        continue;
175                    }
176                    incrementProcessed();
177                    batch.add(src);
178                    Timer.Context stopWatch = processTimer.time();
179                    try {
180                        setThreadName(src);
181                        process(session, src);
182                        restoreThreadName();
183                    } catch (Exception e) {
184                        log.error("Exception while consuming node: " + src.getName(), e);
185                        ExceptionUtils.checkInterrupt(e);
186                        TransactionHelper.setTransactionRollbackOnly();
187                    } finally {
188                        stopWatch.stop();
189                    }
190                    commitIfNeededOrReplayBatch(src);
191                }
192                commitOrReplayBatch();
193
194            }
195
196            private void restoreThreadName() {
197                currentThread().setName(threadName);
198            }
199
200            private void setThreadName(SourceNode src) {
201                String name = threadName + "-" + nbProcessed;
202                if (src != null) {
203                    name += "-" + src.getName();
204                } else {
205                    name += "-null";
206                }
207                currentThread().setName(name);
208            }
209
210            private void commitIfNeededOrReplayBatch(SourceNode lastSrc) {
211                if (TransactionHelper.isTransactionMarkedRollback()) {
212                    log.error("Transaction marked as rollback while processing node: " + lastSrc.getName());
213                    rollbackAndReplayBatch(session);
214                } else {
215                    commitIfNeeded(session);
216                }
217            }
218
219            private void commitOrReplayBatch() {
220                if (TransactionHelper.isTransactionMarkedRollback()) {
221                    rollbackAndReplayBatch(session);
222                } else {
223                    commit(session);
224                }
225            }
226
227        };
228
229        if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
230            // This is needed to acquire a session
231            startTransaction();
232        }
233        runner.runUnrestricted();
234
235    }
236
237    protected abstract void process(CoreSession session, SourceNode bh) throws Exception;
238
239    /**
240     * commit if batch is full or if transaction is running for more than 5 min.
241     */
242    protected void commitIfNeeded(CoreSession session) {
243        long t = System.currentTimeMillis();
244        if (batch.isFull() || (t - lastCommitTime > TRANSACTION_THRESOLD_MS)) {
245            if (! batch.isFull()) {
246                log.warn("Force batch commit because transaction time is too long, current batch size: " + batch.size());
247            }
248            commit(session);
249            if (t - lastCheckTime > CHECK_INTERVAL) {
250                lastImediatThroughput = 1000 * (nbProcessed.get() - lastCount + 0.0) / (t - lastCheckTime);
251                lastCount = nbProcessed.get();
252                lastCheckTime = t;
253            }
254        }
255    }
256
257    protected void commit(CoreSession session) {
258        if (batch.size() > 0) {
259            Timer.Context stopWatch = commitTimer.time();
260            try {
261                log.debug("Commit batch of " + batch.size() + " nodes");
262                session.save();
263                TransactionHelper.commitOrRollbackTransaction();
264                batch.clear();
265                startTransaction();
266            } finally {
267                stopWatch.stop();
268                lastCommitTime = System.currentTimeMillis();
269            }
270
271        }
272    }
273
274    protected void rollbackAndReplayBatch(CoreSession session) {
275        log.info("Rollback a batch of " + batch.size() + " docs");
276        TransactionHelper.setTransactionRollbackOnly();
277        session.save();
278        TransactionHelper.commitOrRollbackTransaction();
279        replayBatch(session);
280        batch.clear();
281        startTransaction();
282    }
283
284    /**
285     * Replays the current batch in an isolated Transaction for each Source Node.
286     *
287     * @param session
288     * @throws InterruptedException
289     */
290    private void replayBatch(CoreSession session) {
291        if (! replayMode) {
292            log.error("No replay mode, loosing the batch");
293            return;
294        }
295        log.error("Replaying batch in isolated transaction");
296        for (SourceNode node : batch.getNodes()) {
297            boolean success = false;
298            startTransaction();
299            retryCount.inc();
300            Timer.Context stopWatch = processTimer.time();
301            try {
302                process(session, node);
303            } catch (Exception e) { // deals with interrupt below
304                ExceptionUtils.checkInterrupt(e);
305                onSourceNodeException(node, e);
306                TransactionHelper.setTransactionRollbackOnly();
307                failCount.inc();
308            } finally {
309                stopWatch.stop();
310            }
311            session.save();
312            if (TransactionHelper.isTransactionMarkedRollback()) {
313                onSourceNodeRollBack(node);
314            } else {
315                success = true;
316            }
317            TransactionHelper.commitOrRollbackTransaction();
318            if (success) {
319                log.debug("Replaying successfully node: " + node.getName());
320            } else {
321                log.error("Import failure after replay on node: " + node.getName());
322            }
323        }
324    }
325
326    /**
327     * Override if you want to do more that logging the error.
328     *
329     * @param node
330     * @param e
331     */
332    protected void onSourceNodeException(SourceNode node, Exception e) {
333        log.error(String.format("Unable to import node [%s]", node.getName()), e);
334    }
335
336    /**
337     * Override if you want to do more that logging the error.
338     *
339     * @param node
340     */
341    protected void onSourceNodeRollBack(SourceNode node) {
342        log.error(String.format("Rollback while replaying consumer node [%s]", node.getName()));
343    }
344
345    public String getOriginatingUsername() {
346        return originatingUsername;
347    }
348
349    public void setOriginatingUsername(String originatingUsername) {
350        this.originatingUsername = originatingUsername;
351    }
352
353}