001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 */ 017package org.nuxeo.ecm.platform.importer.queue; 018 019import org.joda.time.DateTime; 020import org.joda.time.Minutes; 021import org.joda.time.Seconds; 022import org.nuxeo.common.utils.ExceptionUtils; 023import org.nuxeo.ecm.core.api.CoreInstance; 024import org.nuxeo.ecm.core.api.CoreSession; 025import org.nuxeo.ecm.core.api.DocumentModel; 026import org.nuxeo.ecm.core.api.PathRef; 027import org.nuxeo.ecm.platform.importer.filter.ImporterFilter; 028import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 029import org.nuxeo.ecm.platform.importer.queue.consumer.Consumer; 030import org.nuxeo.ecm.platform.importer.queue.consumer.ConsumerFactory; 031import org.nuxeo.ecm.platform.importer.queue.consumer.ImportStat; 032import org.nuxeo.ecm.platform.importer.queue.manager.QueuesManager; 033import org.nuxeo.ecm.platform.importer.queue.producer.Producer; 034import org.nuxeo.ecm.platform.importer.source.SourceNode; 035 036import java.util.ArrayList; 037import java.util.List; 038import java.util.concurrent.BlockingQueue; 039 040 041/** 042 * @since 8.3 043 */ 044public class QueueImporter { 045 046 protected ImporterLogger log = null; 047 048 protected long processedNodesConsumer = 0L; 049 050 protected long unprocessedNodesConsumer = 0L; 051 052 protected long nbDocsCreated = 0L; 053 054 protected volatile boolean isRunning = false; 055 056 protected final ImportStat importStat = new ImportStat(); 057 058 protected final List<ImporterFilter> filters = new ArrayList<>(); 059 060 protected final List<Thread> consumerThreads = new ArrayList<>(); 061 062 protected Thread producerThread; 063 064 public QueueImporter(ImporterLogger log) { 065 this.log = log; 066 } 067 068 public void importDocuments(Producer producer, QueuesManager manager, String importPath, String repositoryName, 069 int batchSize, ConsumerFactory factory) { 070 log.info("importer: Starting import process"); 071 isRunning = true; 072 Exception finalException = null; 073 DateTime importStarted = new DateTime(); 074 075 enableFilters(); 076 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)){ 077 producer.init(manager); 078 DocumentModel root = session.getDocument(new PathRef(importPath)); 079 startProducerThread(producer); 080 List<Consumer> consumers = startConsumerPool(manager, root, batchSize, factory); 081 finalException = waitForProducer(producer); 082 consumersCanStop(consumers); 083 finalException = waitForConsumers(consumers); 084 checkConsumerQueues(manager); 085 updateStats(consumers, producer); 086 087 } catch (Exception e) { 088 log.error("Error while importing", e); 089 finalException = e; 090 } finally { 091 disableFilters(finalException); 092 isRunning = false; 093 } 094 095 DateTime importFinished = new DateTime(); 096 log.info(String.format("import: End of process: producer send %d docs, consumer receive %d docs, creating %d docs (include retries) in %s mn, rate %.2f doc/s.", 097 producer.getNbProcessed(), processedNodesConsumer, nbDocsCreated, 098 Minutes.minutesBetween(importStarted, importFinished).getMinutes(), 099 processedNodesConsumer/(float) Seconds.secondsBetween(importStarted, importFinished).getSeconds())); 100 } 101 102 protected void checkConsumerQueues(QueuesManager manager) { 103 unprocessedNodesConsumer = 0; 104 for (int i = 0; i < manager.getNBConsumers(); i++) { 105 BlockingQueue<SourceNode> queue = manager.getQueue(i); 106 if (! queue.isEmpty()) { 107 log.error("Queue of conusmer " + i + " not empty, draining " + queue.size() + " nodes to errors"); 108 unprocessedNodesConsumer += queue.size(); 109 do { 110 SourceNode node = queue.poll(); 111 if (node != null) { 112 log.error("Unable to import " + node.getName() + " by consumer " + i); 113 } 114 } while (!queue.isEmpty()); 115 } 116 } 117 } 118 119 private void updateStats(List<Consumer> consumers, Producer producer) { 120 nbDocsCreated = 0; 121 for (Consumer c : consumers) { 122 processedNodesConsumer += c.getNbProcessed(); 123 nbDocsCreated += c.getNbDocsCreated(); 124 } 125 if (unprocessedNodesConsumer > 0) { 126 log.error("Total number of unprocessed doc because of consumers unexpected end: " + unprocessedNodesConsumer); 127 } 128 if (producer.getNbProcessed() != processedNodesConsumer) { 129 log.error( 130 String.format("Producer produced %s nodes, Consumers processed %s nodes, some nodes have been lost", 131 producer.getNbProcessed(), processedNodesConsumer)); 132 } 133 134 } 135 136 private Exception waitForConsumers(List<Consumer> consumers) { 137 Exception ret = null; 138 try { 139 while (!consumersTerminated(consumers)) { 140 Thread.sleep(100); 141 } 142 } catch (InterruptedException e) { 143 log.error("importer: Got an InterruptedException", e); 144 ret = e; 145 ExceptionUtils.checkInterrupt(e); 146 } finally { 147 for (Consumer consumer : consumers) { 148 if (!consumer.isTerminated()) { 149 log.warn("Forcibly stopping consumer"); 150 consumer.mustStop(); 151 } 152 } 153 } 154 log.info("importer: All consumers has terminated their work."); 155 for (Thread thread: consumerThreads) { 156 try { 157 thread.join(); 158 } catch (InterruptedException e) { 159 log.error("importer: Got an InterruptedException", e); 160 ExceptionUtils.checkInterrupt(e); 161 } 162 } 163 log.info("importer: All consumers threads terminated"); 164 consumerThreads.clear(); 165 166 int processed = 0; 167 int i = 0; 168 for (Consumer consumer : consumers) { 169 processed += consumer.getNbProcessed(); 170 i += 1; 171 } 172 log.info("importer: " + i + " consumers terminated, processed: " + processed ); 173 174 return ret; 175 } 176 177 private void consumersCanStop(List<Consumer> consumers) { 178 consumers.forEach(TaskRunner::canStop); 179 } 180 181 protected Exception waitForProducer(Producer producer) { 182 Exception ret = null; 183 try { 184 while (producerThread.isAlive() && !producer.isTerminated()) { 185 Thread.sleep(100); 186 } 187 log.info("importer: producer terminated its work"); 188 producerThread.join(); 189 log.info("importer: producer thread terminated"); 190 producerThread = null; 191 } catch (InterruptedException e) { 192 log.error("importer: Got an InterruptedException", e); 193 ExceptionUtils.checkInterrupt(e); 194 ret = e; 195 } finally { 196 if (!producer.isTerminated()) { 197 log.warn("Forcibly stopping producer"); 198 producer.mustStop(); 199 } 200 } 201 log.info("importer: producer terminated processed: " + producer.getNbProcessed()); 202 return ret; 203 } 204 205 protected boolean consumersTerminated(List<Consumer> consumers) { 206 for (Consumer c : consumers) { 207 if (!c.isTerminated()) { 208 return false; 209 } 210 } 211 return true; 212 } 213 214 private List<Consumer> startConsumerPool(QueuesManager manager, DocumentModel root, int batchSize, ConsumerFactory factory) { 215 ArrayList<Consumer> ret = new ArrayList<>(manager.getNBConsumers()); 216 for (int i = 0; i < manager.getNBConsumers(); i++) { 217 Consumer c; 218 c = factory.createConsumer(log, root, batchSize, manager.getQueue(i)); 219 ret.add(c); 220 Thread ct = new Thread(c); 221 ct.setName("import-Consumer" + i); 222 ct.setUncaughtExceptionHandler((t, e) -> { 223 log.error("Uncaught exception in " + ct.getName() + ". Consumer is going to be stopped", e); 224 }); 225 ct.start(); 226 consumerThreads.add(ct); 227 } 228 return ret; 229 } 230 231 protected void startProducerThread(final Producer producer) { 232 Thread p = new Thread(producer); 233 p.setName("import-Producer"); 234 p.setUncaughtExceptionHandler((t, e) -> { 235 log.error("Uncaught exception in " + p.getName() + ". Producer is going to be stopped", e); 236 producer.mustStop(); 237 }); 238 p.start(); 239 producerThread = p; 240 } 241 242 public ImportStat getImportStat() { 243 return importStat; 244 } 245 246 public void addFilter(ImporterFilter filter) { 247 log.debug(String.format("Filter with %s, was added on the importer with the hash code %s.", filter.toString(), 248 hashCode())); 249 filters.add(filter); 250 } 251 252 public long getCreatedDocsCounter() { 253 return nbDocsCreated; 254 } 255 256 protected void enableFilters() { 257 for (ImporterFilter filter : filters) { 258 log.debug(String.format("Running filter with %s, on the importer with the hash code %s.", filter.toString(), 259 hashCode())); 260 filter.handleBeforeImport(); 261 } 262 if (filters.size() == 0) { 263 log.debug(String.format("No filters are registered on the importer with hash code %s", hashCode())); 264 } 265 } 266 267 protected void disableFilters(Exception finalException) { 268 for (ImporterFilter filter : filters) { 269 filter.handleAfterImport(finalException); 270 } 271 272 } 273 274 public boolean isRunning() { 275 return isRunning; 276 } 277 278}