001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 */
017package org.nuxeo.ecm.platform.importer.queue.consumer;
018
019import java.util.concurrent.BlockingQueue;
020import java.util.concurrent.TimeUnit;
021
022import org.nuxeo.common.utils.ExceptionUtils;
023import org.nuxeo.ecm.core.api.CoreSession;
024import org.nuxeo.ecm.core.api.DocumentModel;
025import org.nuxeo.ecm.core.api.DocumentRef;
026import org.nuxeo.ecm.core.api.UnrestrictedSessionRunner;
027import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
028import org.nuxeo.ecm.platform.importer.queue.AbstractTaskRunner;
029import org.nuxeo.ecm.platform.importer.source.SourceNode;
030import org.nuxeo.runtime.transaction.TransactionHelper;
031
032/**
033 * @since 8.3
034 */
035public abstract class AbstractConsumer extends AbstractTaskRunner implements Consumer {
036
037    protected final Batch batch;
038
039    protected final String repositoryName;
040
041    protected final BlockingQueue<SourceNode> queue;
042
043    protected final DocumentRef rootRef;
044
045    protected long startTime = 0;
046
047    protected long lastCheckTime = 0;
048
049    protected long lastCount = 0;
050
051    protected static final long CHECK_INTERVAL = 2000;
052
053    protected double lastImediatThroughput = 0;
054
055    protected final ImportStat importStat;
056
057    protected String originatingUsername;
058
059    protected ImporterLogger log = null;
060
061    public AbstractConsumer(ImporterLogger log, DocumentModel root, int batchSize, BlockingQueue<SourceNode> queue) {
062        this.log = log;
063        repositoryName = root.getRepositoryName();
064        this.batch = new Batch(batchSize);
065        this.queue = queue;
066        rootRef = root.getRef();
067        importStat = new ImportStat();
068    }
069
070    @Override
071    public void run() {
072
073        started = true;
074        startTime = System.currentTimeMillis();
075        lastCheckTime = startTime;
076
077        UnrestrictedSessionRunner runner = new UnrestrictedSessionRunner(repositoryName, originatingUsername) {
078            @Override
079            public void run() {
080                while (!mustStop) {
081                    try {
082                        SourceNode src = queue.poll(1, TimeUnit.SECONDS);
083                        if (src != null) {
084                            try {
085                                incrementProcessed();
086                                batch.add(src);
087                                process(session, src);
088                                commitIfNeeded(session);
089                            } catch (Exception e) {
090                                TransactionHelper.setTransactionRollbackOnly();
091                                commitIfNeeded(session);
092                            }
093                        } else {
094                            if (canStop) {
095                                commit(session);
096                                break;
097                            }
098                        }
099                    } catch (InterruptedException e) {
100                        log.warn("Interrupted exception received, stopping consumer");
101                        mustStop = true;
102                    }
103                }
104            }
105        };
106
107        TransactionHelper.startTransaction();
108        try {
109            runner.runUnrestricted();
110        } catch (Exception e) {
111            log.error("Error while running consumer.", e);
112            TransactionHelper.setTransactionRollbackOnly();
113            error = e;
114            ExceptionUtils.checkInterrupt(e);
115        } finally {
116            completed = true;
117            started = false;
118            try {
119                TransactionHelper.commitOrRollbackTransaction();
120            } catch (Exception e) {
121                log.error("Error while running consumer. Could not commit or rollback transaction", e);
122                throw e;
123            }
124        }
125    }
126
127    protected abstract void process(CoreSession session, SourceNode bh) throws Exception;
128
129    protected void commitIfNeeded(CoreSession session) {
130        if (batch.isFull()) {
131            commit(session);
132            long t = System.currentTimeMillis();
133            if (t - lastCheckTime > CHECK_INTERVAL) {
134                lastImediatThroughput = 1000 * (nbProcessed - lastCount + 0.0) / (t - lastCheckTime);
135                lastCount = nbProcessed;
136                lastCheckTime = t;
137            }
138        }
139    }
140
141    protected void commit(CoreSession session) {
142        session.save();
143        boolean rolledBack = TransactionHelper.isTransactionMarkedRollback();
144        TransactionHelper.commitOrRollbackTransaction();
145        if (rolledBack) {
146            replayBatch(session);
147        }
148        batch.clear();
149        TransactionHelper.startTransaction();
150    }
151
152    /**
153     * Replays the current batch in an isolated Transaction for each Source Node.
154     *
155     * @param session
156     * @throws InterruptedException
157     */
158    private void replayBatch(CoreSession session) {
159        log.error("Replaying batch in isolated transaction because one source node rolled back the transaction");
160        for (SourceNode node : batch.getNodes()) {
161            TransactionHelper.startTransaction();
162            try {
163                process(session, node);
164            } catch (Exception e) { // deals with interrupt below
165                ExceptionUtils.checkInterrupt(e);
166                onSourceNodeException(node, e);
167                TransactionHelper.setTransactionRollbackOnly();
168            }
169
170            if (TransactionHelper.isTransactionMarkedRollback()) {
171                onSourceNodeRollBack(node);
172            }
173            session.save();
174            TransactionHelper.commitOrRollbackTransaction();
175        }
176    }
177
178    /**
179     * Override if you want to do more that logging the error.
180     *
181     * @param node
182     * @param e
183     */
184    protected void onSourceNodeException(SourceNode node, Exception e) {
185        log.error(String.format("   Error consuming source node [%s]", node.getName()), e);
186    }
187
188    /**
189     * Override if you want to do more that logging the error.
190     *
191     * @param node
192     * @param e
193     */
194    protected void onSourceNodeRollBack(SourceNode node) {
195        log.error(String.format("   Source node [%s] did rollback the transaction", node.getName()));
196    }
197
198    @Override
199    public double getImmediateThroughput() {
200        return lastImediatThroughput;
201    }
202
203    @Override
204    public double getThroughput() {
205        return 1000 * (nbProcessed + 0.0) / (System.currentTimeMillis() + 1 - startTime);
206    }
207
208    @Override
209    public ImportStat getImportStat() {
210        return importStat;
211    }
212
213    public String getOriginatingUsername() {
214        return originatingUsername;
215    }
216
217    public void setOriginatingUsername(String originatingUsername) {
218        this.originatingUsername = originatingUsername;
219    }
220
221}