001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 */ 017package org.nuxeo.ecm.platform.importer.queue.consumer; 018 019import com.codahale.metrics.Counter; 020import com.codahale.metrics.MetricRegistry; 021import com.codahale.metrics.SharedMetricRegistries; 022import com.codahale.metrics.Timer; 023 024import org.nuxeo.common.utils.ExceptionUtils; 025import org.nuxeo.ecm.core.api.CoreSession; 026import org.nuxeo.ecm.core.api.DocumentModel; 027import org.nuxeo.ecm.core.api.DocumentRef; 028import org.nuxeo.ecm.core.api.UnrestrictedSessionRunner; 029import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 030import org.nuxeo.ecm.platform.importer.queue.AbstractTaskRunner; 031import org.nuxeo.ecm.platform.importer.source.SourceNode; 032import org.nuxeo.runtime.metrics.MetricsService; 033import org.nuxeo.runtime.transaction.TransactionHelper; 034 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.TimeUnit; 037 038import static java.lang.Thread.currentThread; 039import static org.nuxeo.runtime.transaction.TransactionHelper.startTransaction; 040 041/** 042 * @since 8.3 043 */ 044public abstract class AbstractConsumer extends AbstractTaskRunner implements Consumer { 045 046 protected final Batch batch; 047 048 protected final String repositoryName; 049 050 protected final BlockingQueue<SourceNode> queue; 051 052 protected final DocumentRef rootRef; 053 054 protected long startTime = 0; 055 056 protected long lastCheckTime = 0; 057 058 protected long lastCount = 0; 059 060 protected static final long CHECK_INTERVAL = 2000; 061 062 protected double lastImediatThroughput = 0; 063 064 protected String originatingUsername; 065 066 protected ImporterLogger log = null; 067 068 protected boolean replayMode = true; 069 070 protected String threadName; 071 072 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 073 074 protected final Timer processTimer; 075 076 protected final Timer commitTimer; 077 078 protected final Counter retryCount; 079 080 protected final Counter failCount; 081 082 protected final Counter consumerCount; 083 084 public AbstractConsumer(ImporterLogger log, DocumentModel root, int batchSize, BlockingQueue<SourceNode> queue) { 085 this.log = log; 086 repositoryName = root.getRepositoryName(); 087 this.batch = new Batch(batchSize); 088 this.queue = queue; 089 rootRef = root.getRef(); 090 091 processTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "import")); 092 commitTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "commit")); 093 retryCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "retry")); 094 failCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer", "fail")); 095 consumerCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "queue", "consumer")); 096 097 log.info("Create consumer root:" + root.getPathAsString() + " batchSize: " + batchSize); 098 } 099 100 @Override 101 public void run() { 102 threadName = currentThread().getName(); 103 started = true; 104 startTime = System.currentTimeMillis(); 105 lastCheckTime = startTime; 106 consumerCount.inc(); 107 try { 108 runImport(); 109 } catch (Exception e) { 110 log.error("Unexpected End of consumer after " + getNbProcessed() + " nodes.", e); 111 ExceptionUtils.checkInterrupt(e); 112 runDrainer(); 113 error = e; 114 } finally { 115 completed = true; 116 started = false; 117 consumerCount.dec(); 118 } 119 } 120 121 protected void runDrainer() { 122 // consumer is broken but we drain the queue to prevent blocking the producer 123 markThreadName("draining"); 124 log.error("Consumer is broken, draining the queue to rejected"); 125 do { 126 try { 127 SourceNode src = queue.poll(1, TimeUnit.SECONDS); 128 if (src == null && canStop) { 129 log.info("End of broken consumer, processed node: " + getNbProcessed()); 130 break; 131 } else if (src != null) { 132 log.error("Consumer is broken reject node: " + src.getName()); 133 onSourceNodeException(src, error); 134 } 135 } catch (InterruptedException e) { 136 log.error("Interrupted exception received, stopping consumer"); 137 break; 138 } 139 } while(true); 140 } 141 142 private void markThreadName(String mark) { 143 Thread.currentThread().setName(Thread.currentThread().getName() + "-" + mark); 144 } 145 146 protected void runImport() { 147 148 UnrestrictedSessionRunner runner = new UnrestrictedSessionRunner(repositoryName, originatingUsername) { 149 @Override 150 public void run() { 151 log.info("Consumer running"); 152 SourceNode src; 153 while (true) { 154 try { 155 src = queue.poll(1, TimeUnit.SECONDS); 156 } catch (InterruptedException e) { 157 log.error("Interrupted exception received, stopping consumer"); 158 break; 159 } 160 if (src == null) { 161 log.debug("Poll timeout, queue size:" + queue.size()); 162 if (canStop) { 163 log.info("End of consumer, processed node: " + getNbProcessed()); 164 break; 165 } 166 continue; 167 } 168 incrementProcessed(); 169 batch.add(src); 170 Timer.Context stopWatch = processTimer.time(); 171 try { 172 setThreadName(src); 173 process(session, src); 174 restoreThreadName(); 175 } catch (Exception e) { 176 log.error("Exception while consuming node: " + src.getName(), e); 177 ExceptionUtils.checkInterrupt(e); 178 TransactionHelper.setTransactionRollbackOnly(); 179 } finally { 180 stopWatch.stop(); 181 } 182 commitIfNeededOrReplayBatch(src); 183 } 184 commitOrReplayBatch(); 185 186 } 187 188 private void restoreThreadName() { 189 currentThread().setName(threadName); 190 } 191 192 private void setThreadName(SourceNode src) { 193 String name = threadName + "-" + nbProcessed; 194 if (src != null) { 195 name += "-" + src.getName(); 196 } else { 197 name += "-null"; 198 } 199 currentThread().setName(name); 200 } 201 202 private void commitIfNeededOrReplayBatch(SourceNode lastSrc) { 203 if (TransactionHelper.isTransactionMarkedRollback()) { 204 log.error("Transaction marked as rollback while processing node: " + lastSrc.getName()); 205 rollbackAndReplayBatch(session); 206 } else { 207 commitIfNeeded(session); 208 } 209 } 210 211 private void commitOrReplayBatch() { 212 if (TransactionHelper.isTransactionMarkedRollback()) { 213 rollbackAndReplayBatch(session); 214 } else { 215 commit(session); 216 } 217 } 218 219 }; 220 221 if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) { 222 // This is needed to acquire a session 223 startTransaction(); 224 } 225 runner.runUnrestricted(); 226 227 } 228 229 protected abstract void process(CoreSession session, SourceNode bh) throws Exception; 230 231 protected void commitIfNeeded(CoreSession session) { 232 if (batch.isFull()) { 233 commit(session); 234 long t = System.currentTimeMillis(); 235 if (t - lastCheckTime > CHECK_INTERVAL) { 236 lastImediatThroughput = 1000 * (nbProcessed.get() - lastCount + 0.0) / (t - lastCheckTime); 237 lastCount = nbProcessed.get(); 238 lastCheckTime = t; 239 } 240 } 241 } 242 243 protected void commit(CoreSession session) { 244 if (batch.size() > 0) { 245 Timer.Context stopWatch = commitTimer.time(); 246 try { 247 log.debug("Commit batch of " + batch.size() + " nodes"); 248 session.save(); 249 TransactionHelper.commitOrRollbackTransaction(); 250 batch.clear(); 251 startTransaction(); 252 } finally { 253 stopWatch.stop(); 254 } 255 256 } 257 } 258 259 protected void rollbackAndReplayBatch(CoreSession session) { 260 log.info("Rollback a batch of " + batch.size() + " docs"); 261 TransactionHelper.setTransactionRollbackOnly(); 262 session.save(); 263 TransactionHelper.commitOrRollbackTransaction(); 264 replayBatch(session); 265 batch.clear(); 266 startTransaction(); 267 } 268 269 /** 270 * Replays the current batch in an isolated Transaction for each Source Node. 271 * 272 * @param session 273 * @throws InterruptedException 274 */ 275 private void replayBatch(CoreSession session) { 276 if (! replayMode) { 277 log.error("No replay mode, loosing the batch"); 278 return; 279 } 280 log.error("Replaying batch in isolated transaction"); 281 for (SourceNode node : batch.getNodes()) { 282 boolean success = false; 283 startTransaction(); 284 retryCount.inc(); 285 Timer.Context stopWatch = processTimer.time(); 286 try { 287 process(session, node); 288 } catch (Exception e) { // deals with interrupt below 289 ExceptionUtils.checkInterrupt(e); 290 onSourceNodeException(node, e); 291 TransactionHelper.setTransactionRollbackOnly(); 292 failCount.inc(); 293 } finally { 294 stopWatch.stop(); 295 } 296 session.save(); 297 if (TransactionHelper.isTransactionMarkedRollback()) { 298 onSourceNodeRollBack(node); 299 } else { 300 success = true; 301 } 302 TransactionHelper.commitOrRollbackTransaction(); 303 if (success) { 304 log.debug("Replaying successfully node: " + node.getName()); 305 } else { 306 log.error("Import failure after replay on node: " + node.getName()); 307 } 308 } 309 } 310 311 /** 312 * Override if you want to do more that logging the error. 313 * 314 * @param node 315 * @param e 316 */ 317 protected void onSourceNodeException(SourceNode node, Exception e) { 318 log.error(String.format("Unable to import node [%s]", node.getName()), e); 319 } 320 321 /** 322 * Override if you want to do more that logging the error. 323 * 324 * @param node 325 */ 326 protected void onSourceNodeRollBack(SourceNode node) { 327 log.error(String.format("Rollback while replaying consumer node [%s]", node.getName())); 328 } 329 330 public String getOriginatingUsername() { 331 return originatingUsername; 332 } 333 334 public void setOriginatingUsername(String originatingUsername) { 335 this.originatingUsername = originatingUsername; 336 } 337 338}