001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 */ 017package org.nuxeo.ecm.platform.importer.queue.consumer; 018 019import java.util.concurrent.BlockingQueue; 020import java.util.concurrent.TimeUnit; 021 022import org.nuxeo.common.utils.ExceptionUtils; 023import org.nuxeo.ecm.core.api.CoreSession; 024import org.nuxeo.ecm.core.api.DocumentModel; 025import org.nuxeo.ecm.core.api.DocumentRef; 026import org.nuxeo.ecm.core.api.UnrestrictedSessionRunner; 027import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 028import org.nuxeo.ecm.platform.importer.queue.AbstractTaskRunner; 029import org.nuxeo.ecm.platform.importer.source.SourceNode; 030import org.nuxeo.runtime.transaction.TransactionHelper; 031 032/** 033 * @since 8.3 034 */ 035public abstract class AbstractConsumer extends AbstractTaskRunner implements Consumer { 036 037 protected final Batch batch; 038 039 protected final String repositoryName; 040 041 protected final BlockingQueue<SourceNode> queue; 042 043 protected final DocumentRef rootRef; 044 045 protected long startTime = 0; 046 047 protected long lastCheckTime = 0; 048 049 protected long lastCount = 0; 050 051 protected static final long CHECK_INTERVAL = 2000; 052 053 protected double lastImediatThroughput = 0; 054 055 protected final ImportStat importStat; 056 057 protected String originatingUsername; 058 059 protected ImporterLogger log = null; 060 061 public AbstractConsumer(ImporterLogger log, DocumentModel root, int batchSize, BlockingQueue<SourceNode> queue) { 062 this.log = log; 063 repositoryName = root.getRepositoryName(); 064 this.batch = new Batch(batchSize); 065 this.queue = queue; 066 rootRef = root.getRef(); 067 importStat = new ImportStat(); 068 } 069 070 @Override 071 public void run() { 072 073 started = true; 074 startTime = System.currentTimeMillis(); 075 lastCheckTime = startTime; 076 077 UnrestrictedSessionRunner runner = new UnrestrictedSessionRunner(repositoryName, originatingUsername) { 078 @Override 079 public void run() { 080 while (!mustStop) { 081 try { 082 SourceNode src = queue.poll(1, TimeUnit.SECONDS); 083 if (src != null) { 084 try { 085 incrementProcessed(); 086 batch.add(src); 087 process(session, src); 088 commitIfNeeded(session); 089 } catch (Exception e) { 090 TransactionHelper.setTransactionRollbackOnly(); 091 commitIfNeeded(session); 092 } 093 } else { 094 if (canStop) { 095 commit(session); 096 break; 097 } 098 } 099 } catch (InterruptedException e) { 100 log.warn("Interrupted exception received, stopping consumer"); 101 mustStop = true; 102 } 103 } 104 } 105 }; 106 107 TransactionHelper.startTransaction(); 108 try { 109 runner.runUnrestricted(); 110 } catch (Exception e) { 111 log.error("Error while running consumer.", e); 112 TransactionHelper.setTransactionRollbackOnly(); 113 error = e; 114 ExceptionUtils.checkInterrupt(e); 115 } finally { 116 completed = true; 117 started = false; 118 try { 119 TransactionHelper.commitOrRollbackTransaction(); 120 } catch (Exception e) { 121 log.error("Error while running consumer. Could not commit or rollback transaction", e); 122 throw e; 123 } 124 } 125 } 126 127 protected abstract void process(CoreSession session, SourceNode bh) throws Exception; 128 129 protected void commitIfNeeded(CoreSession session) { 130 if (batch.isFull()) { 131 commit(session); 132 long t = System.currentTimeMillis(); 133 if (t - lastCheckTime > CHECK_INTERVAL) { 134 lastImediatThroughput = 1000 * (nbProcessed - lastCount + 0.0) / (t - lastCheckTime); 135 lastCount = nbProcessed; 136 lastCheckTime = t; 137 } 138 } 139 } 140 141 protected void commit(CoreSession session) { 142 session.save(); 143 boolean rolledBack = TransactionHelper.isTransactionMarkedRollback(); 144 TransactionHelper.commitOrRollbackTransaction(); 145 if (rolledBack) { 146 replayBatch(session); 147 } 148 batch.clear(); 149 TransactionHelper.startTransaction(); 150 } 151 152 /** 153 * Replays the current batch in an isolated Transaction for each Source Node. 154 * 155 * @param session 156 * @throws InterruptedException 157 */ 158 private void replayBatch(CoreSession session) { 159 log.error("Replaying batch in isolated transaction because one source node rolled back the transaction"); 160 for (SourceNode node : batch.getNodes()) { 161 TransactionHelper.startTransaction(); 162 try { 163 process(session, node); 164 } catch (Exception e) { // deals with interrupt below 165 ExceptionUtils.checkInterrupt(e); 166 onSourceNodeException(node, e); 167 TransactionHelper.setTransactionRollbackOnly(); 168 } 169 170 if (TransactionHelper.isTransactionMarkedRollback()) { 171 onSourceNodeRollBack(node); 172 } 173 session.save(); 174 TransactionHelper.commitOrRollbackTransaction(); 175 } 176 } 177 178 /** 179 * Override if you want to do more that logging the error. 180 * 181 * @param node 182 * @param e 183 */ 184 protected void onSourceNodeException(SourceNode node, Exception e) { 185 log.error(String.format(" Error consuming source node [%s]", node.getName()), e); 186 } 187 188 /** 189 * Override if you want to do more that logging the error. 190 * 191 * @param node 192 * @param e 193 */ 194 protected void onSourceNodeRollBack(SourceNode node) { 195 log.error(String.format(" Source node [%s] did rollback the transaction", node.getName())); 196 } 197 198 @Override 199 public double getImmediateThroughput() { 200 return lastImediatThroughput; 201 } 202 203 @Override 204 public double getThroughput() { 205 return 1000 * (nbProcessed + 0.0) / (System.currentTimeMillis() + 1 - startTime); 206 } 207 208 @Override 209 public ImportStat getImportStat() { 210 return importStat; 211 } 212 213 public String getOriginatingUsername() { 214 return originatingUsername; 215 } 216 217 public void setOriginatingUsername(String originatingUsername) { 218 this.originatingUsername = originatingUsername; 219 } 220 221}