001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 */ 017package org.nuxeo.ecm.platform.importer.queue; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.List; 022 023import org.joda.time.DateTime; 024import org.joda.time.Minutes; 025import org.joda.time.Seconds; 026import org.nuxeo.common.utils.ExceptionUtils; 027import org.nuxeo.ecm.core.api.CoreInstance; 028import org.nuxeo.ecm.core.api.CoreSession; 029import org.nuxeo.ecm.core.api.DocumentModel; 030import org.nuxeo.ecm.core.api.PathRef; 031import org.nuxeo.ecm.platform.importer.filter.ImporterFilter; 032import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 033import org.nuxeo.ecm.platform.importer.queue.consumer.Consumer; 034import org.nuxeo.ecm.platform.importer.queue.consumer.ConsumerFactory; 035import org.nuxeo.ecm.platform.importer.queue.consumer.ImportStat; 036import org.nuxeo.ecm.platform.importer.queue.manager.QueuesManager; 037import org.nuxeo.ecm.platform.importer.queue.producer.Producer; 038import org.nuxeo.ecm.platform.importer.source.SourceNode; 039 040 041/** 042 * @since 8.3 043 */ 044public class QueueImporter { 045 046 protected ImporterLogger log = null; 047 048 protected long processedNodesConsumer = 0L; 049 050 protected long unprocessedNodesConsumer = 0L; 051 052 protected long nbDocsCreated = 0L; 053 054 protected volatile boolean isRunning = false; 055 056 protected final ImportStat importStat = new ImportStat(); 057 058 protected final List<ImporterFilter> filters = new ArrayList<>(); 059 060 protected final List<Thread> consumerThreads = new ArrayList<>(); 061 062 protected Thread producerThread; 063 064 public QueueImporter(ImporterLogger log) { 065 this.log = log; 066 } 067 068 public void importDocuments(Producer producer, QueuesManager manager, String importPath, String repositoryName, 069 int batchSize, ConsumerFactory factory) { 070 importDocuments(Collections.singletonList(producer), manager, importPath, repositoryName, batchSize, factory); 071 } 072 073 public void importDocuments(List<Producer> producers, QueuesManager manager, String importPath, String repositoryName, 074 int batchSize, ConsumerFactory factory) { 075 log.info("importer: Starting import process"); 076 isRunning = true; 077 Exception finalException = null; 078 DateTime importStarted = new DateTime(); 079 long processed = 0; 080 enableFilters(); 081 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)){ 082 producers.get(0).init(manager); 083 DocumentModel root = session.getDocument(new PathRef(importPath)); 084 startProducerThread(producers.get(0)); 085 List<Consumer> consumers = startConsumerPool(manager, root, batchSize, factory); 086 finalException = waitForProducer(producers.get(0)); 087 processed = producers.get(0).getNbProcessed(); 088 for (int i = 1; i< producers.size(); i++) { 089 startProducerThread(producers.get(i)); 090 finalException = waitForProducer(producers.get(i)); 091 processed += producers.get(i).getNbProcessed(); 092 } 093 consumersCanStop(consumers); 094 finalException = waitForConsumers(consumers); 095 checkConsumerQueues(manager); 096 updateStats(consumers, producers); 097 } catch (Exception e) { 098 log.error("Error while importing", e); 099 finalException = e; 100 } finally { 101 disableFilters(finalException); 102 isRunning = false; 103 } 104 105 DateTime importFinished = new DateTime(); 106 log.info(String.format("import: End of process: producer send %d docs, consumer receive %d docs, creating %d docs (include retries) in %s mn, rate %.2f doc/s.", 107 processed, processedNodesConsumer, nbDocsCreated, 108 Minutes.minutesBetween(importStarted, importFinished).getMinutes(), 109 processedNodesConsumer/(float) Seconds.secondsBetween(importStarted, importFinished).getSeconds())); 110 } 111 112 protected void checkConsumerQueues(QueuesManager manager) { 113 unprocessedNodesConsumer = 0; 114 for (int i = 0; i < manager.count(); i++) { 115 while (! manager.isEmpty(i)) { 116 log.error("Queue of conusmer " + i + " not empty, draining " + manager.size(i) + " nodes to errors"); 117 unprocessedNodesConsumer += manager.size(i); 118 do { 119 SourceNode node = manager.poll(i); 120 if (node != null) { 121 log.error("Unable to import " + node.getName() + " by consumer " + i); 122 } 123 } while (manager.isEmpty(i)); 124 } 125 } 126 } 127 128 private void updateStats(List<Consumer> consumers, List<Producer> producers) { 129 nbDocsCreated = 0; 130 for (Consumer c : consumers) { 131 processedNodesConsumer += c.getNbProcessed(); 132 nbDocsCreated += c.getNbDocsCreated(); 133 } 134 if (unprocessedNodesConsumer > 0) { 135 log.error("Total number of unprocessed doc because of consumers unexpected end: " + unprocessedNodesConsumer); 136 } 137 for (Producer producer: producers) { 138 if (producer.getNbProcessed() != processedNodesConsumer) { 139 log.error( 140 String.format("Producer produced %s nodes, Consumers processed %s nodes, some nodes have been lost", 141 producer.getNbProcessed(), processedNodesConsumer)); 142 } 143 } 144 145 } 146 147 private Exception waitForConsumers(List<Consumer> consumers) { 148 Exception ret = null; 149 try { 150 while (!consumersTerminated(consumers)) { 151 Thread.sleep(100); 152 } 153 } catch (InterruptedException e) { 154 log.error("importer: Got an InterruptedException", e); 155 ret = e; 156 ExceptionUtils.checkInterrupt(e); 157 } finally { 158 for (Consumer consumer : consumers) { 159 if (!consumer.isTerminated()) { 160 log.warn("Forcibly stopping consumer"); 161 consumer.mustStop(); 162 } 163 } 164 } 165 log.info("importer: All consumers has terminated their work."); 166 for (Thread thread: consumerThreads) { 167 try { 168 thread.join(); 169 } catch (InterruptedException e) { 170 log.error("importer: Got an InterruptedException", e); 171 ExceptionUtils.checkInterrupt(e); 172 } 173 } 174 log.info("importer: All consumers threads terminated"); 175 consumerThreads.clear(); 176 177 int processed = 0; 178 int i = 0; 179 for (Consumer consumer : consumers) { 180 processed += consumer.getNbProcessed(); 181 i += 1; 182 } 183 log.info("importer: " + i + " consumers terminated, processed: " + processed ); 184 185 return ret; 186 } 187 188 private void consumersCanStop(List<Consumer> consumers) { 189 consumers.forEach(TaskRunner::canStop); 190 } 191 192 protected Exception waitForProducer(Producer producer) { 193 Exception ret = null; 194 try { 195 while (producerThread.isAlive() && !producer.isTerminated()) { 196 Thread.sleep(100); 197 } 198 log.info("importer: producer terminated its work"); 199 producerThread.join(); 200 log.info("importer: producer thread terminated"); 201 producerThread = null; 202 } catch (InterruptedException e) { 203 log.error("importer: Got an InterruptedException", e); 204 ExceptionUtils.checkInterrupt(e); 205 ret = e; 206 } finally { 207 if (!producer.isTerminated()) { 208 log.warn("Forcibly stopping producer"); 209 producer.mustStop(); 210 } 211 } 212 log.info("importer: producer terminated processed: " + producer.getNbProcessed()); 213 return ret; 214 } 215 216 protected boolean consumersTerminated(List<Consumer> consumers) { 217 for (Consumer c : consumers) { 218 if (!c.isTerminated()) { 219 return false; 220 } 221 } 222 return true; 223 } 224 225 private List<Consumer> startConsumerPool(QueuesManager manager, DocumentModel root, int batchSize, ConsumerFactory factory) { 226 ArrayList<Consumer> ret = new ArrayList<>(manager.count()); 227 for (int i = 0; i < manager.count(); i++) { 228 Consumer c; 229 c = factory.createConsumer(log, root, batchSize, manager, i); 230 ret.add(c); 231 Thread ct = new Thread(c); 232 ct.setName("import-Consumer" + i); 233 ct.setUncaughtExceptionHandler((t, e) -> { 234 log.error("Uncaught exception in " + ct.getName() + ". Consumer is going to be stopped", e); 235 }); 236 ct.start(); 237 consumerThreads.add(ct); 238 } 239 return ret; 240 } 241 242 protected void startProducerThread(final Producer producer) { 243 Thread p = new Thread(producer); 244 p.setName("import-Producer"); 245 p.setUncaughtExceptionHandler((t, e) -> { 246 log.error("Uncaught exception in " + p.getName() + ". Producer is going to be stopped", e); 247 producer.mustStop(); 248 }); 249 p.start(); 250 producerThread = p; 251 } 252 253 public ImportStat getImportStat() { 254 return importStat; 255 } 256 257 public void addFilter(ImporterFilter filter) { 258 log.debug(String.format("Filter with %s, was added on the importer with the hash code %s.", filter.toString(), 259 hashCode())); 260 filters.add(filter); 261 } 262 263 public long getCreatedDocsCounter() { 264 return nbDocsCreated; 265 } 266 267 protected void enableFilters() { 268 for (ImporterFilter filter : filters) { 269 log.debug(String.format("Running filter with %s, on the importer with the hash code %s.", filter.toString(), 270 hashCode())); 271 filter.handleBeforeImport(); 272 } 273 if (filters.size() == 0) { 274 log.debug(String.format("No filters are registered on the importer with hash code %s", hashCode())); 275 } 276 } 277 278 protected void disableFilters(Exception finalException) { 279 for (ImporterFilter filter : filters) { 280 filter.handleAfterImport(finalException); 281 } 282 283 } 284 285 public boolean isRunning() { 286 return isRunning; 287 } 288 289}