001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 */ 017package org.nuxeo.ecm.platform.importer.queue.consumer; 018 019import com.codahale.metrics.Counter; 020import com.codahale.metrics.MetricRegistry; 021import com.codahale.metrics.SharedMetricRegistries; 022import com.codahale.metrics.Timer; 023 024import org.nuxeo.common.utils.ExceptionUtils; 025import org.nuxeo.ecm.core.api.CoreSession; 026import org.nuxeo.ecm.core.api.DocumentModel; 027import org.nuxeo.ecm.core.api.DocumentRef; 028import org.nuxeo.ecm.core.api.UnrestrictedSessionRunner; 029import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 030import org.nuxeo.ecm.platform.importer.queue.AbstractTaskRunner; 031import org.nuxeo.ecm.platform.importer.queue.manager.QueuesManager; 032import org.nuxeo.ecm.platform.importer.source.SourceNode; 033import org.nuxeo.runtime.metrics.MetricsService; 034import org.nuxeo.runtime.transaction.TransactionHelper; 035 036import java.util.concurrent.TimeUnit; 037 038import static java.lang.Thread.currentThread; 039import static org.nuxeo.runtime.transaction.TransactionHelper.startTransaction; 040 041/** 042 * @since 8.3 043 */ 044public abstract class AbstractConsumer extends AbstractTaskRunner implements Consumer { 045 046 protected final Batch batch; 047 048 protected final String repositoryName; 049 050 protected final QueuesManager queuesManager; 051 052 protected final int queue; 053 054 protected final DocumentRef rootRef; 055 056 protected long startTime = 0; 057 058 protected long lastCheckTime = 0; 059 060 protected long lastCount = 0; 061 062 protected static final long CHECK_INTERVAL = 2000; 063 064 protected double lastImediatThroughput = 0; 065 066 protected String originatingUsername; 067 068 protected ImporterLogger log = null; 069 070 protected boolean replayMode = true; 071 072 protected String threadName; 073 074 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 075 076 protected final Timer processTimer; 077 078 protected final Timer commitTimer; 079 080 protected final Counter retryCount; 081 082 protected final Counter failCount; 083 084 protected final Counter consumerCount; 085 086 protected long lastCommitTime; 087 088 protected static final long TRANSACTION_THRESOLD_MS = 3 * 60 * 1000; // 3 min 089 090 public AbstractConsumer(ImporterLogger log, DocumentModel root, int batchSize, QueuesManager queuesManager, int queue) { 091 this.log = log; 092 repositoryName = root.getRepositoryName(); 093 this.batch = new Batch(batchSize); 094 this.queuesManager = queuesManager; 095 this.queue = queue; 096 rootRef = root.getRef(); 097 098 lastCommitTime = System.currentTimeMillis(); 099 processTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "import")); 100 commitTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "commit")); 101 retryCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "retry")); 102 failCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "fail")); 103 consumerCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer")); 104 105 log.info("Create consumer root:" + root.getPathAsString() + " batchSize: " + batchSize); 106 } 107 108 @Override 109 public void run() { 110 threadName = currentThread().getName(); 111 started = true; 112 startTime = System.currentTimeMillis(); 113 lastCheckTime = startTime; 114 consumerCount.inc(); 115 try { 116 runImport(); 117 } catch (Exception e) { 118 log.error("Unexpected End of consumer after " + getNbProcessed() + " nodes.", e); 119 ExceptionUtils.checkInterrupt(e); 120 runDrainer(); 121 error = e; 122 } finally { 123 completed = true; 124 started = false; 125 consumerCount.dec(); 126 } 127 } 128 129 protected void runDrainer() { 130 // consumer is broken but we drain the queue to prevent blocking the producer 131 markThreadName("draining"); 132 log.error("Consumer is broken, draining the queue to rejected"); 133 do { 134 try { 135 SourceNode src = queuesManager.poll(queue, 1, TimeUnit.SECONDS); 136 if (src == null && canStop) { 137 log.info("End of broken consumer, processed node: " + getNbProcessed()); 138 break; 139 } else if (src != null) { 140 log.error("Consumer is broken reject node: " + src.getName()); 141 onSourceNodeException(src, error); 142 } 143 } catch (InterruptedException e) { 144 log.error("Interrupted exception received, stopping consumer"); 145 break; 146 } 147 } while(true); 148 } 149 150 private void markThreadName(String mark) { 151 Thread.currentThread().setName(Thread.currentThread().getName() + "-" + mark); 152 } 153 154 protected void runImport() { 155 156 UnrestrictedSessionRunner runner = new UnrestrictedSessionRunner(repositoryName, originatingUsername) { 157 @Override 158 public void run() { 159 log.info("Consumer running"); 160 SourceNode src; 161 while (true) { 162 try { 163 src = queuesManager.poll(queue, 1, TimeUnit.SECONDS); 164 } catch (InterruptedException e) { 165 log.error("Interrupted exception received, stopping consumer"); 166 break; 167 } 168 if (src == null) { 169 log.debug("Poll timeout, queue size:" + queuesManager.size(queue)); 170 if (canStop) { 171 log.info("End of consumer, processed node: " + getNbProcessed()); 172 break; 173 } 174 continue; 175 } 176 incrementProcessed(); 177 batch.add(src); 178 Timer.Context stopWatch = processTimer.time(); 179 try { 180 setThreadName(src); 181 process(session, src); 182 restoreThreadName(); 183 } catch (Exception e) { 184 log.error("Exception while consuming node: " + src.getName(), e); 185 ExceptionUtils.checkInterrupt(e); 186 TransactionHelper.setTransactionRollbackOnly(); 187 } finally { 188 stopWatch.stop(); 189 } 190 commitIfNeededOrReplayBatch(src); 191 } 192 commitOrReplayBatch(); 193 194 } 195 196 private void restoreThreadName() { 197 currentThread().setName(threadName); 198 } 199 200 private void setThreadName(SourceNode src) { 201 String name = threadName + "-" + nbProcessed; 202 if (src != null) { 203 name += "-" + src.getName(); 204 } else { 205 name += "-null"; 206 } 207 currentThread().setName(name); 208 } 209 210 private void commitIfNeededOrReplayBatch(SourceNode lastSrc) { 211 if (TransactionHelper.isTransactionMarkedRollback()) { 212 log.error("Transaction marked as rollback while processing node: " + lastSrc.getName()); 213 rollbackAndReplayBatch(session); 214 } else { 215 commitIfNeeded(session); 216 } 217 } 218 219 private void commitOrReplayBatch() { 220 if (TransactionHelper.isTransactionMarkedRollback()) { 221 rollbackAndReplayBatch(session); 222 } else { 223 commit(session); 224 } 225 } 226 227 }; 228 229 if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) { 230 // This is needed to acquire a session 231 startTransaction(); 232 } 233 runner.runUnrestricted(); 234 235 } 236 237 protected abstract void process(CoreSession session, SourceNode bh) throws Exception; 238 239 /** 240 * commit if batch is full or if transaction is running for more than 5 min. 241 */ 242 protected void commitIfNeeded(CoreSession session) { 243 long t = System.currentTimeMillis(); 244 if (batch.isFull() || (t - lastCommitTime > TRANSACTION_THRESOLD_MS)) { 245 if (! batch.isFull()) { 246 log.warn("Force batch commit because transaction time is too long, current batch size: " + batch.size()); 247 } 248 commit(session); 249 if (t - lastCheckTime > CHECK_INTERVAL) { 250 lastImediatThroughput = 1000 * (nbProcessed.get() - lastCount + 0.0) / (t - lastCheckTime); 251 lastCount = nbProcessed.get(); 252 lastCheckTime = t; 253 } 254 } 255 } 256 257 protected void commit(CoreSession session) { 258 if (batch.size() > 0) { 259 Timer.Context stopWatch = commitTimer.time(); 260 try { 261 log.debug("Commit batch of " + batch.size() + " nodes"); 262 session.save(); 263 TransactionHelper.commitOrRollbackTransaction(); 264 batch.clear(); 265 startTransaction(); 266 } finally { 267 stopWatch.stop(); 268 lastCommitTime = System.currentTimeMillis(); 269 } 270 271 } 272 } 273 274 protected void rollbackAndReplayBatch(CoreSession session) { 275 log.info("Rollback a batch of " + batch.size() + " docs"); 276 TransactionHelper.setTransactionRollbackOnly(); 277 session.save(); 278 TransactionHelper.commitOrRollbackTransaction(); 279 replayBatch(session); 280 batch.clear(); 281 startTransaction(); 282 } 283 284 /** 285 * Replays the current batch in an isolated Transaction for each Source Node. 286 * 287 * @param session 288 * @throws InterruptedException 289 */ 290 private void replayBatch(CoreSession session) { 291 if (! replayMode) { 292 log.error("No replay mode, loosing the batch"); 293 return; 294 } 295 log.error("Replaying batch in isolated transaction"); 296 for (SourceNode node : batch.getNodes()) { 297 boolean success = false; 298 startTransaction(); 299 retryCount.inc(); 300 Timer.Context stopWatch = processTimer.time(); 301 try { 302 process(session, node); 303 } catch (Exception e) { // deals with interrupt below 304 ExceptionUtils.checkInterrupt(e); 305 onSourceNodeException(node, e); 306 TransactionHelper.setTransactionRollbackOnly(); 307 failCount.inc(); 308 } finally { 309 stopWatch.stop(); 310 } 311 session.save(); 312 if (TransactionHelper.isTransactionMarkedRollback()) { 313 onSourceNodeRollBack(node); 314 } else { 315 success = true; 316 } 317 TransactionHelper.commitOrRollbackTransaction(); 318 if (success) { 319 log.debug("Replaying successfully node: " + node.getName()); 320 } else { 321 log.error("Import failure after replay on node: " + node.getName()); 322 } 323 } 324 } 325 326 /** 327 * Override if you want to do more that logging the error. 328 * 329 * @param node 330 * @param e 331 */ 332 protected void onSourceNodeException(SourceNode node, Exception e) { 333 log.error(String.format("Unable to import node [%s]", node.getName()), e); 334 } 335 336 /** 337 * Override if you want to do more that logging the error. 338 * 339 * @param node 340 */ 341 protected void onSourceNodeRollBack(SourceNode node) { 342 log.error(String.format("Rollback while replaying consumer node [%s]", node.getName())); 343 } 344 345 public String getOriginatingUsername() { 346 return originatingUsername; 347 } 348 349 public void setOriginatingUsername(String originatingUsername) { 350 this.originatingUsername = originatingUsername; 351 } 352 353}