001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 */
017package org.nuxeo.ecm.platform.importer.queue.consumer;
018
019import com.codahale.metrics.Counter;
020import com.codahale.metrics.MetricRegistry;
021import com.codahale.metrics.SharedMetricRegistries;
022import com.codahale.metrics.Timer;
023
024import org.nuxeo.common.utils.ExceptionUtils;
025import org.nuxeo.ecm.core.api.CoreSession;
026import org.nuxeo.ecm.core.api.DocumentModel;
027import org.nuxeo.ecm.core.api.DocumentRef;
028import org.nuxeo.ecm.core.api.UnrestrictedSessionRunner;
029import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
030import org.nuxeo.ecm.platform.importer.queue.AbstractTaskRunner;
031import org.nuxeo.ecm.platform.importer.source.SourceNode;
032import org.nuxeo.runtime.metrics.MetricsService;
033import org.nuxeo.runtime.transaction.TransactionHelper;
034
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.TimeUnit;
037
038import static java.lang.Thread.currentThread;
039import static org.nuxeo.runtime.transaction.TransactionHelper.startTransaction;
040
041/**
042 * @since 8.3
043 */
044public abstract class AbstractConsumer extends AbstractTaskRunner implements Consumer {
045
046    protected final Batch batch;
047
048    protected final String repositoryName;
049
050    protected final BlockingQueue<SourceNode> queue;
051
052    protected final DocumentRef rootRef;
053
054    protected long startTime = 0;
055
056    protected long lastCheckTime = 0;
057
058    protected long lastCount = 0;
059
060    protected static final long CHECK_INTERVAL = 2000;
061
062    protected double lastImediatThroughput = 0;
063
064    protected String originatingUsername;
065
066    protected ImporterLogger log = null;
067
068    protected boolean replayMode = true;
069
070    protected String threadName;
071
072    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
073
074    protected final Timer processTimer;
075
076    protected final Timer commitTimer;
077
078    protected final Counter retryCount;
079
080    protected final Counter failCount;
081
082    protected final Counter consumerCount;
083
084    public AbstractConsumer(ImporterLogger log, DocumentModel root, int batchSize, BlockingQueue<SourceNode> queue) {
085        this.log = log;
086        repositoryName = root.getRepositoryName();
087        this.batch = new Batch(batchSize);
088        this.queue = queue;
089        rootRef = root.getRef();
090
091        processTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "import"));
092        commitTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "commit"));
093        retryCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "retry"));
094        failCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "fail"));
095        consumerCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer"));
096
097        log.info("Create consumer root:" + root.getPathAsString() + " batchSize: " + batchSize);
098    }
099
100    @Override
101    public void run() {
102        threadName = currentThread().getName();
103        started = true;
104        startTime = System.currentTimeMillis();
105        lastCheckTime = startTime;
106        consumerCount.inc();
107        try {
108            runImport();
109        } catch (Exception e) {
110            log.error("Unexpected End of consumer after " + getNbProcessed() + " nodes.", e);
111            ExceptionUtils.checkInterrupt(e);
112            runDrainer();
113            error = e;
114        } finally {
115            completed = true;
116            started = false;
117            consumerCount.dec();
118        }
119    }
120
121    protected void runDrainer() {
122        // consumer is broken but we drain the queue to prevent blocking the producer
123        markThreadName("draining");
124        log.error("Consumer is broken, draining the queue to rejected");
125        do {
126            try {
127                SourceNode src = queue.poll(1, TimeUnit.SECONDS);
128                if (src == null && canStop) {
129                    log.info("End of broken consumer, processed node: " + getNbProcessed());
130                    break;
131                } else if (src != null) {
132                    log.error("Consumer is broken reject node: " + src.getName());
133                    onSourceNodeException(src, error);
134                }
135            } catch (InterruptedException e) {
136                log.error("Interrupted exception received, stopping consumer");
137                break;
138            }
139        } while(true);
140    }
141
142    private void markThreadName(String mark) {
143        Thread.currentThread().setName(Thread.currentThread().getName() + "-" + mark);
144    }
145
146    protected void runImport() {
147
148        UnrestrictedSessionRunner runner = new UnrestrictedSessionRunner(repositoryName, originatingUsername) {
149            @Override
150            public void run() {
151                log.info("Consumer running");
152                SourceNode src;
153                while (true) {
154                    try {
155                        src = queue.poll(1, TimeUnit.SECONDS);
156                    } catch (InterruptedException e) {
157                        log.error("Interrupted exception received, stopping consumer");
158                        break;
159                    }
160                    if (src == null) {
161                        log.debug("Poll timeout, queue size:" + queue.size());
162                        if (canStop) {
163                            log.info("End of consumer, processed node: " + getNbProcessed());
164                            break;
165                        }
166                        continue;
167                    }
168                    incrementProcessed();
169                    batch.add(src);
170                    Timer.Context stopWatch = processTimer.time();
171                    try {
172                        setThreadName(src);
173                        process(session, src);
174                        restoreThreadName();
175                    } catch (Exception e) {
176                        log.error("Exception while consuming node: " + src.getName(), e);
177                        ExceptionUtils.checkInterrupt(e);
178                        TransactionHelper.setTransactionRollbackOnly();
179                    } finally {
180                        stopWatch.stop();
181                    }
182                    commitIfNeededOrReplayBatch(src);
183                }
184                commitOrReplayBatch();
185
186            }
187
188            private void restoreThreadName() {
189                currentThread().setName(threadName);
190            }
191
192            private void setThreadName(SourceNode src) {
193                String name = threadName + "-" + nbProcessed;
194                if (src != null) {
195                    name += "-" + src.getName();
196                } else {
197                    name += "-null";
198                }
199                currentThread().setName(name);
200            }
201
202            private void commitIfNeededOrReplayBatch(SourceNode lastSrc) {
203                if (TransactionHelper.isTransactionMarkedRollback()) {
204                    log.error("Transaction marked as rollback while processing node: " + lastSrc.getName());
205                    rollbackAndReplayBatch(session);
206                } else {
207                    commitIfNeeded(session);
208                }
209            }
210
211            private void commitOrReplayBatch() {
212                if (TransactionHelper.isTransactionMarkedRollback()) {
213                    rollbackAndReplayBatch(session);
214                } else {
215                    commit(session);
216                }
217            }
218
219        };
220
221        if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
222            // This is needed to acquire a session
223            startTransaction();
224        }
225        runner.runUnrestricted();
226
227    }
228
229    protected abstract void process(CoreSession session, SourceNode bh) throws Exception;
230
231    protected void commitIfNeeded(CoreSession session) {
232        if (batch.isFull()) {
233            commit(session);
234            long t = System.currentTimeMillis();
235            if (t - lastCheckTime > CHECK_INTERVAL) {
236                lastImediatThroughput = 1000 * (nbProcessed.get() - lastCount + 0.0) / (t - lastCheckTime);
237                lastCount = nbProcessed.get();
238                lastCheckTime = t;
239            }
240        }
241    }
242
243    protected void commit(CoreSession session) {
244        if (batch.size() > 0) {
245            Timer.Context stopWatch = commitTimer.time();
246            try {
247                log.debug("Commit batch of " + batch.size() + " nodes");
248                session.save();
249                TransactionHelper.commitOrRollbackTransaction();
250                batch.clear();
251                startTransaction();
252            } finally {
253                stopWatch.stop();
254            }
255
256        }
257    }
258
259    protected void rollbackAndReplayBatch(CoreSession session) {
260        log.info("Rollback a batch of " + batch.size() + " docs");
261        TransactionHelper.setTransactionRollbackOnly();
262        session.save();
263        TransactionHelper.commitOrRollbackTransaction();
264        replayBatch(session);
265        batch.clear();
266        startTransaction();
267    }
268
269    /**
270     * Replays the current batch in an isolated Transaction for each Source Node.
271     *
272     * @param session
273     * @throws InterruptedException
274     */
275    private void replayBatch(CoreSession session) {
276        if (! replayMode) {
277            log.error("No replay mode, loosing the batch");
278            return;
279        }
280        log.error("Replaying batch in isolated transaction");
281        for (SourceNode node : batch.getNodes()) {
282            boolean success = false;
283            startTransaction();
284            retryCount.inc();
285            Timer.Context stopWatch = processTimer.time();
286            try {
287                process(session, node);
288            } catch (Exception e) { // deals with interrupt below
289                ExceptionUtils.checkInterrupt(e);
290                onSourceNodeException(node, e);
291                TransactionHelper.setTransactionRollbackOnly();
292                failCount.inc();
293            } finally {
294                stopWatch.stop();
295            }
296            session.save();
297            if (TransactionHelper.isTransactionMarkedRollback()) {
298                onSourceNodeRollBack(node);
299            } else {
300                success = true;
301            }
302            TransactionHelper.commitOrRollbackTransaction();
303            if (success) {
304                log.debug("Replaying successfully node: " + node.getName());
305            } else {
306                log.error("Import failure after replay on node: " + node.getName());
307            }
308        }
309    }
310
311    /**
312     * Override if you want to do more that logging the error.
313     *
314     * @param node
315     * @param e
316     */
317    protected void onSourceNodeException(SourceNode node, Exception e) {
318        log.error(String.format("Unable to import node [%s]", node.getName()), e);
319    }
320
321    /**
322     * Override if you want to do more that logging the error.
323     *
324     * @param node
325     */
326    protected void onSourceNodeRollBack(SourceNode node) {
327        log.error(String.format("Rollback while replaying consumer node [%s]", node.getName()));
328    }
329
330    public String getOriginatingUsername() {
331        return originatingUsername;
332    }
333
334    public void setOriginatingUsername(String originatingUsername) {
335        this.originatingUsername = originatingUsername;
336    }
337
338}