001/* 002 * (C) Copyright 2006-2008 Nuxeo SAS (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Nuxeo - initial API and implementation 016 * 017 * $Id$ 018 */ 019 020package org.nuxeo.ecm.platform.importer.base; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.LinkedBlockingQueue; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import org.javasimon.SimonManager; 036import org.javasimon.Stopwatch; 037import org.nuxeo.common.utils.ExceptionUtils; 038import org.nuxeo.ecm.core.api.CoreInstance; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.DocumentNotFoundException; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.api.PathRef; 044import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory; 045import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 046import org.nuxeo.ecm.platform.importer.filter.ImporterFilter; 047import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 048import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 049import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener; 050import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 051import org.nuxeo.ecm.platform.importer.log.PerfLogger; 052import org.nuxeo.ecm.platform.importer.source.SourceNode; 053import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy; 054import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 055 056/** 057 * Generic importer 058 * 059 * @author Thierry Delprat 060 */ 061public class GenericMultiThreadedImporter implements ImporterRunner { 062 063 protected static ThreadPoolExecutor importTP; 064 065 protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 066 067 protected ImporterThreadingPolicy threadPolicy; 068 069 protected ImporterDocumentModelFactory factory; 070 071 protected SourceNode importSource; 072 073 protected DocumentModel targetContainer; 074 075 protected Integer batchSize = 50; 076 077 protected Integer nbThreads = 5; 078 079 protected Integer transactionTimeout = 0; 080 081 protected ImporterLogger log; 082 083 protected CoreSession session; 084 085 protected String importWritePath; 086 087 protected Boolean skipRootContainerCreation = false; 088 089 protected String jobName; 090 091 protected boolean enablePerfLogging = true; 092 093 protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>(); 094 095 protected List<ImporterListener> listeners = new ArrayList<ImporterListener>(); 096 097 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>(); 098 099 protected GenericThreadedImportTask rootImportTask; 100 101 protected final static int DEFAULT_QUEUE_SIZE = 10000; 102 103 protected int queueSize = DEFAULT_QUEUE_SIZE; 104 105 protected String repositoryName; 106 107 public static ThreadPoolExecutor getExecutor() { 108 return importTP; 109 } 110 111 public static synchronized void addCreatedDoc(String taskId, long nbDocs) { 112 String tid = Thread.currentThread().getName(); 113 nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs); 114 } 115 116 public static synchronized long getCreatedDocsCounter() { 117 long counter = 0; 118 for (String tid : nbCreatedDocsByThreads.keySet()) { 119 Long tCounter = nbCreatedDocsByThreads.get(tid); 120 if (tCounter != null) { 121 counter += tCounter; 122 } 123 } 124 return counter; 125 } 126 127 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 128 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) { 129 importSource = sourceNode; 130 this.importWritePath = importWritePath; 131 this.log = log; 132 if (batchSize != null) { 133 this.batchSize = batchSize; 134 } 135 if (nbThreads != null) { 136 this.nbThreads = nbThreads; 137 } 138 if (skipRootContainerCreation != null) { 139 this.skipRootContainerCreation = skipRootContainerCreation; 140 } 141 } 142 143 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 144 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) { 145 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE); 146 } 147 148 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 149 Integer nbThreads, ImporterLogger log) { 150 this(sourceNode, importWritePath, false, batchSize, nbThreads, log); 151 } 152 153 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 154 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) { 155 156 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log); 157 this.jobName = jobName; 158 if (jobName != null) { 159 listeners.add(new JobHistoryListener(jobName)); 160 } 161 } 162 163 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 164 Integer nbThreads, String jobName, ImporterLogger log) { 165 this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log); 166 } 167 168 public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) { 169 this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation, 170 configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log); 171 repositoryName = configuration.repositoryName; 172 } 173 174 public void addFilter(ImporterFilter filter) { 175 log.debug(String.format( 176 "Filter with %s, was added on the importer with the hash code %s. The source node name is %s", 177 filter.toString(), this.hashCode(), importSource.getName())); 178 filters.add(filter); 179 } 180 181 public void addListeners(ImporterListener... listeners) { 182 addListeners(Arrays.asList(listeners)); 183 } 184 185 public void addListeners(Collection<ImporterListener> listeners) { 186 this.listeners.addAll(listeners); 187 } 188 189 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 190 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 191 } 192 193 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 194 this.importingDocumentFilters.addAll(importingDocumentFilters); 195 } 196 197 @Override 198 public void run() { 199 Exception finalException = null; 200 try { 201 session = CoreInstance.openCoreSessionSystem(repositoryName); 202 for (ImporterFilter filter : filters) { 203 log.debug(String.format( 204 "Running filter with %s, on the importer with the hash code %s. The source node name is %s", 205 filter.toString(), this.hashCode(), importSource.getName())); 206 filter.handleBeforeImport(); 207 } 208 if (filters.size() == 0) { 209 log.debug(String.format( 210 "No filters are registered on the importer with hash code %s, while importing the source node with name %s", 211 this.hashCode(), importSource.getName())); 212 } 213 doRun(); 214 } catch (Exception e) { // deals with interrupt below 215 ExceptionUtils.checkInterrupt(e); 216 log.error("Task exec failed", e); 217 finalException = e; 218 } finally { 219 for (ImporterFilter filter : filters) { 220 filter.handleAfterImport(finalException); 221 } 222 if (session != null) { 223 session.close(); 224 session = null; 225 } 226 } 227 } 228 229 public void setRootImportTask(GenericThreadedImportTask rootImportTask) { 230 this.rootImportTask = rootImportTask; 231 } 232 233 protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer, 234 boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) { 235 if (rootImportTask == null) { 236 setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer, 237 skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName)); 238 } else { 239 rootImportTask.setInputSource(importSource); 240 rootImportTask.setTargetFolder(targetContainer); 241 rootImportTask.setSkipContainerCreation(skipRootContainerCreation); 242 rootImportTask.setRsLogger(log); 243 rootImportTask.setFactory(getFactory()); 244 rootImportTask.setThreadPolicy(getThreadPolicy()); 245 rootImportTask.setJobName(jobName); 246 rootImportTask.setBatchSize(batchSize); 247 } 248 rootImportTask.addListeners(listeners); 249 rootImportTask.addImportingDocumentFilters(importingDocumentFilters); 250 rootImportTask.setTransactionTimeout(transactionTimeout); 251 return rootImportTask; 252 } 253 254 /** 255 * Creates non-daemon threads at normal priority. 256 */ 257 public static class NamedThreadFactory implements ThreadFactory { 258 259 private final AtomicInteger threadNumber = new AtomicInteger(); 260 261 private final ThreadGroup group; 262 263 private final String prefix; 264 265 public NamedThreadFactory(String prefix) { 266 SecurityManager sm = System.getSecurityManager(); 267 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 268 this.prefix = prefix; 269 } 270 271 @Override 272 public Thread newThread(Runnable r) { 273 String name = prefix + threadNumber.incrementAndGet(); 274 Thread thread = new Thread(group, r, name); 275 // do not set daemon 276 thread.setPriority(Thread.NORM_PRIORITY); 277 return thread; 278 } 279 } 280 281 protected void doRun() throws IOException { 282 283 targetContainer = getTargetContainer(); 284 285 nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 286 287 importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS, 288 new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-")); 289 290 initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName); 291 292 rootImportTask.setRootTask(); 293 long t0 = System.currentTimeMillis(); 294 295 notifyBeforeImport(); 296 297 importTP.execute(rootImportTask); 298 sleep(200); 299 int activeTasks = importTP.getActiveCount(); 300 int oldActiveTasks = 0; 301 long lastLogProgressTime = System.currentTimeMillis(); 302 long lastCreatedDocCounter = 0; 303 304 String[] headers = { "nbDocs", "average", "imediate" }; 305 PerfLogger perfLogger = new PerfLogger(headers); 306 while (activeTasks > 0) { 307 sleep(500); 308 activeTasks = importTP.getActiveCount(); 309 boolean logProgress = false; 310 if (oldActiveTasks != activeTasks) { 311 oldActiveTasks = activeTasks; 312 log.debug("currently " + activeTasks + " active import Threads"); 313 logProgress = true; 314 315 } 316 long ti = System.currentTimeMillis(); 317 if (ti - lastLogProgressTime > 5000) { 318 logProgress = true; 319 } 320 if (logProgress) { 321 long inbCreatedDocs = getCreatedDocsCounter(); 322 long deltaT = ti - lastLogProgressTime; 323 double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0)); 324 double imediateSpeed = averageSpeed; 325 if (deltaT > 0) { 326 imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT)); 327 } 328 log.info(inbCreatedDocs + " docs created"); 329 log.info("average speed = " + averageSpeed + " docs/s"); 330 log.info("immediate speed = " + imediateSpeed + " docs/s"); 331 332 if (enablePerfLogging) { 333 Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed }; 334 perfLogger.log(perfData); 335 } 336 337 lastLogProgressTime = ti; 338 lastCreatedDocCounter = inbCreatedDocs; 339 } 340 } 341 stopImportProcrocess(); 342 log.info("All Threads terminated"); 343 perfLogger.release(); 344 notifyAfterImport(); 345 346 long t1 = System.currentTimeMillis(); 347 long nbCreatedDocs = getCreatedDocsCounter(); 348 log.info(nbCreatedDocs + " docs created"); 349 log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s"); 350 for (String k : nbCreatedDocsByThreads.keySet()) { 351 log.info(k + " --> " + nbCreatedDocsByThreads.get(k)); 352 } 353 Stopwatch stopwatch; 354 for (String name : SimonManager.simonNames()) { 355 if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) { 356 continue; 357 } 358 stopwatch = SimonManager.getStopwatch(name); 359 if (stopwatch.getCounter() > 0) { 360 log.info(stopwatch.toString()); 361 } 362 } 363 364 } 365 366 protected static void sleep(int millis) { 367 try { 368 Thread.sleep(millis); 369 } catch (InterruptedException e) { 370 // restore interrupt status 371 Thread.currentThread().interrupt(); 372 throw new NuxeoException(e); 373 } 374 } 375 376 protected DocumentModel getTargetContainer() { 377 if (targetContainer == null) { 378 targetContainer = createTargetContainer(); 379 } 380 return targetContainer; 381 } 382 383 /** 384 * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be 385 * overridden in subclasses. 386 * 387 * @return 388 */ 389 protected DocumentModel createTargetContainer() { 390 try { 391 return session.getDocument(new PathRef(importWritePath)); 392 } catch (DocumentNotFoundException e) { 393 log.error(e.getMessage()); 394 throw e; 395 } 396 } 397 398 public ImporterThreadingPolicy getThreadPolicy() { 399 if (threadPolicy == null) { 400 threadPolicy = new DefaultMultiThreadingPolicy(); 401 } 402 return threadPolicy; 403 } 404 405 public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 406 this.threadPolicy = threadPolicy; 407 } 408 409 public ImporterDocumentModelFactory getFactory() { 410 if (factory == null) { 411 factory = new DefaultDocumentModelFactory(); 412 } 413 return factory; 414 } 415 416 public void setFactory(ImporterDocumentModelFactory factory) { 417 this.factory = factory; 418 } 419 420 /** 421 * @since 5.9.4 422 */ 423 public void setTransactionTimeout(int transactionTimeout) { 424 this.transactionTimeout = transactionTimeout; 425 } 426 427 public void setEnablePerfLogging(boolean enablePerfLogging) { 428 this.enablePerfLogging = enablePerfLogging; 429 } 430 431 public void stopImportProcrocess() { 432 if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) { 433 importTP.shutdownNow(); 434 } 435 } 436 437 protected void notifyBeforeImport() { 438 for (ImporterListener listener : listeners) { 439 listener.beforeImport(); 440 } 441 } 442 443 protected void notifyAfterImport() { 444 for (ImporterListener listener : listeners) { 445 listener.afterImport(); 446 } 447 } 448 449 /** 450 * @since 7.1 451 */ 452 public String getRepositoryName() { 453 return repositoryName; 454 } 455 456 /** 457 * @since 7.1 458 */ 459 public void setRepositoryName(String repositoryName) { 460 this.repositoryName = repositoryName; 461 } 462 463}