001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 */ 017package org.nuxeo.ecm.platform.importer.queue; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import org.joda.time.DateTime; 023import org.joda.time.Minutes; 024import org.nuxeo.common.utils.ExceptionUtils; 025import org.nuxeo.ecm.core.api.CoreInstance; 026import org.nuxeo.ecm.core.api.CoreSession; 027import org.nuxeo.ecm.core.api.DocumentModel; 028import org.nuxeo.ecm.core.api.PathRef; 029import org.nuxeo.ecm.platform.importer.filter.ImporterFilter; 030import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 031import org.nuxeo.ecm.platform.importer.queue.consumer.Consumer; 032import org.nuxeo.ecm.platform.importer.queue.consumer.ConsumerFactory; 033import org.nuxeo.ecm.platform.importer.queue.consumer.ImportStat; 034import org.nuxeo.ecm.platform.importer.queue.manager.QueuesManager; 035import org.nuxeo.ecm.platform.importer.queue.producer.Producer; 036 037/** 038 * @since 8.3 039 */ 040public class QueueImporter { 041 042 protected ImporterLogger log = null; 043 044 private boolean isTerminated = false; 045 046 private boolean mustStop = false; 047 048 protected long processedNodesConsumer = 0L; 049 050 protected ImportStat importStat = new ImportStat(); 051 052 protected long nbDocsCreated = 0L; 053 054 protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>(); 055 056 public QueueImporter(ImporterLogger log) { 057 this.log = log; 058 } 059 060 public void mustStop() { 061 mustStop = true; 062 } 063 064 public void importDocuments(Producer producer, QueuesManager manager, String importPath, String repositoryName, 065 int batchSize, ConsumerFactory factory) { 066 log.info("Starting import process"); 067 068 producer.init(manager); 069 070 // start the producer 071 Thread p = new Thread(producer); 072 p.setName("import-Producer"); 073 074 p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 075 076 @Override 077 public void uncaughtException(Thread t, Throwable e) { 078 log.error("Uncaught exception in " + p.getName() + ". Producer is going to be stopped", e); 079 producer.mustStop(); 080 } 081 }); 082 p.start(); 083 084 Exception finalException = null; 085 CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName); 086 DateTime importStarted = new DateTime(); 087 try { 088 List<Thread> consumerThreads = new ArrayList<Thread>(); 089 List<Consumer> consumers = new ArrayList<Consumer>(); 090 enableFilters(); 091 092 DocumentModel root = session.getDocument(new PathRef(importPath)); 093 for (int i = 0; i < manager.getNBConsumers(); i++) { 094 Consumer c; 095 c = factory.createConsumer(log, root, batchSize, manager.getQueue(i)); 096 097 consumers.add(c); 098 Thread ct = new Thread(c); 099 100 ct.setName("import-Consumer" + i); 101 ct.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 102 103 @Override 104 public void uncaughtException(Thread t, Throwable e) { 105 log.error("Uncaught exception in " + ct.getName() + ". Consumer is going to be stopped", e); 106 c.mustStop(); 107 } 108 }); 109 ct.start(); 110 consumerThreads.add(ct); 111 } 112 113 try { 114 while (!producer.isTerminated() && !mustStop) { 115 Thread.sleep(50); 116 log.debug("waiting for producer to be completed. Processed docs: " + producer.getNbProcessed()); 117 118 // Check if consumers are still alive 119 boolean consumersTerminated = true; 120 for (Consumer c : consumers) { 121 consumersTerminated = consumersTerminated && c.isTerminated(); 122 } 123 // Otherwise stop the producer 124 if (consumersTerminated) { 125 log.error("all consumers are terminated,but producer is still alive. Stopping producer."); 126 producer.mustStop(); 127 } 128 } 129 } catch (InterruptedException e) { 130 log.error("Error while waiting for producder", e); 131 finalException = e; 132 ExceptionUtils.checkInterrupt(e); 133 } finally { 134 if (!producer.isTerminated()) { 135 log.warn("Forcibly stopping producer"); 136 producer.mustStop(); 137 } 138 } 139 140 Exception pe = producer.getError(); 141 if (pe != null) { 142 log.error("Error during producer execution", pe); 143 finalException = pe; 144 for (Consumer c : consumers) { 145 c.mustStop(); 146 } 147 } else { 148 for (Consumer c : consumers) { 149 c.canStop(); 150 } 151 } 152 153 try { 154 int iLoop = 0; 155 156 while (!isTerminated && !mustStop) { 157 double totalImmediateThroughput = 0; 158 double totalThroughput = 0; 159 nbDocsCreated = 0; 160 161 Thread.sleep(100); 162 isTerminated = true; 163 iLoop++; 164 for (Consumer c : consumers) { 165 isTerminated = isTerminated && c.isTerminated(); 166 nbDocsCreated += c.getNbDocsCreated(); 167 totalImmediateThroughput += c.getImmediateThroughput(); 168 totalThroughput += c.getThroughput(); 169 } 170 171 if (iLoop % 30 == 0) { 172 log.debug("waiting for consumers to be completed. Processed Docs: " + nbDocsCreated + " -- " 173 + totalImmediateThroughput + " docs/s -- " + totalThroughput + " docs/s"); 174 } 175 } 176 // Consumers are done, get total number of nodes imported 177 for (Consumer c : consumers) { 178 processedNodesConsumer += c.getNbProcessed(); 179 } 180 nbDocsCreated = 0; 181 // Consumers are done, get total number of docs created 182 for (Consumer c : consumers) { 183 nbDocsCreated += c.getNbDocsCreated(); 184 importStat.merge(c.getImportStat()); 185 } 186 187 } catch (InterruptedException e) { 188 log.error("Error while waiting for consumers", e); 189 finalException = e; 190 ExceptionUtils.checkInterrupt(e); 191 } finally { 192 for (Consumer c : consumers) { 193 if (!c.isTerminated()) { 194 log.warn("Forcibly stopping consumer"); 195 c.mustStop(); 196 } 197 } 198 } 199 200 } catch (Exception e) { 201 log.error("Error while importing", e); 202 finalException = e; 203 } finally { 204 disableFilters(finalException); 205 if (session != null) { 206 session.close(); 207 session = null; 208 } 209 } 210 211 if (producer.getNbProcessed() != processedNodesConsumer) { 212 log.error( 213 String.format("Producer produced %s nodes, Consumers processed %s nodes, some nodes have been lost", 214 producer.getNbProcessed(), processedNodesConsumer)); 215 } 216 217 DateTime importFinished = new DateTime(); 218 219 log.info(String.format("End of import process : Imported %s docs in %s mn. ", nbDocsCreated, 220 Minutes.minutesBetween(importStarted, importFinished).getMinutes())); 221 222 } 223 224 public ImportStat getImportStat() { 225 return importStat; 226 } 227 228 public void addFilter(ImporterFilter filter) { 229 log.debug(String.format("Filter with %s, was added on the importer with the hash code %s.", filter.toString(), 230 hashCode())); 231 filters.add(filter); 232 } 233 234 public boolean isRunning() { 235 return !isTerminated; 236 } 237 238 public long getCreatedDocsCounter() { 239 return nbDocsCreated; 240 } 241 242 protected void enableFilters() { 243 for (ImporterFilter filter : filters) { 244 log.debug(String.format("Running filter with %s, on the importer with the hash code %s.", filter.toString(), 245 hashCode())); 246 filter.handleBeforeImport(); 247 } 248 if (filters.size() == 0) { 249 log.debug(String.format("No filters are registered on the importer with hash code %s", hashCode())); 250 } 251 } 252 253 protected void disableFilters(Exception finalException) { 254 for (ImporterFilter filter : filters) { 255 filter.handleAfterImport(finalException); 256 } 257 258 } 259 260}