001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 * 019 * $Id$ 020 */ 021 022package org.nuxeo.ecm.platform.importer.base; 023 024import org.javasimon.SimonManager; 025import org.javasimon.Stopwatch; 026import org.nuxeo.common.utils.ExceptionUtils; 027import org.nuxeo.ecm.core.api.CloseableCoreSession; 028import org.nuxeo.ecm.core.api.CoreInstance; 029import org.nuxeo.ecm.core.api.CoreSession; 030import org.nuxeo.ecm.core.api.DocumentModel; 031import org.nuxeo.ecm.core.api.DocumentNotFoundException; 032import org.nuxeo.ecm.core.api.NuxeoException; 033import org.nuxeo.ecm.core.api.PathRef; 034import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory; 035import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 036import org.nuxeo.ecm.platform.importer.filter.ImporterFilter; 037import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 038import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 039import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener; 040import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 041import org.nuxeo.ecm.platform.importer.log.PerfLogger; 042import org.nuxeo.ecm.platform.importer.source.SourceNode; 043import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy; 044import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 045import org.nuxeo.runtime.transaction.TransactionHelper; 046 047import java.io.IOException; 048import java.util.ArrayList; 049import java.util.Arrays; 050import java.util.Collection; 051import java.util.List; 052import java.util.Map; 053import java.util.concurrent.ConcurrentHashMap; 054import java.util.concurrent.LinkedBlockingQueue; 055import java.util.concurrent.ThreadFactory; 056import java.util.concurrent.ThreadPoolExecutor; 057import java.util.concurrent.TimeUnit; 058import java.util.concurrent.atomic.AtomicInteger; 059 060/** 061 * Generic importer 062 * 063 * @author Thierry Delprat 064 */ 065public class GenericMultiThreadedImporter implements ImporterRunner { 066 067 protected static ThreadPoolExecutor importTP; 068 069 protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 070 071 protected ImporterThreadingPolicy threadPolicy; 072 073 protected ImporterDocumentModelFactory factory; 074 075 protected SourceNode importSource; 076 077 protected DocumentModel targetContainer; 078 079 protected Integer batchSize = 50; 080 081 protected Integer nbThreads = 5; 082 083 protected Integer transactionTimeout = 0; 084 085 protected ImporterLogger log; 086 087 protected CoreSession session; 088 089 protected String importWritePath; 090 091 protected Boolean skipRootContainerCreation = false; 092 093 protected String jobName; 094 095 protected boolean enablePerfLogging = true; 096 097 protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>(); 098 099 protected List<ImporterListener> listeners = new ArrayList<ImporterListener>(); 100 101 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>(); 102 103 protected GenericThreadedImportTask rootImportTask; 104 105 protected final static int DEFAULT_QUEUE_SIZE = 10000; 106 107 protected int queueSize = DEFAULT_QUEUE_SIZE; 108 109 protected String repositoryName; 110 111 protected static final String[] PERF_HEADERS = { "nbDocs", "average", "imediate" }; 112 113 public static ThreadPoolExecutor getExecutor() { 114 return importTP; 115 } 116 117 public static synchronized void addCreatedDoc(String taskId, long nbDocs) { 118 String tid = Thread.currentThread().getName(); 119 nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs); 120 } 121 122 public static synchronized long getCreatedDocsCounter() { 123 long counter = 0; 124 for (String tid : nbCreatedDocsByThreads.keySet()) { 125 Long tCounter = nbCreatedDocsByThreads.get(tid); 126 if (tCounter != null) { 127 counter += tCounter; 128 } 129 } 130 return counter; 131 } 132 133 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 134 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) { 135 importSource = sourceNode; 136 this.importWritePath = importWritePath; 137 this.log = log; 138 if (batchSize != null) { 139 this.batchSize = batchSize; 140 } 141 if (nbThreads != null) { 142 this.nbThreads = nbThreads; 143 } 144 if (skipRootContainerCreation != null) { 145 this.skipRootContainerCreation = skipRootContainerCreation; 146 } 147 } 148 149 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 150 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) { 151 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE); 152 } 153 154 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 155 Integer nbThreads, ImporterLogger log) { 156 this(sourceNode, importWritePath, false, batchSize, nbThreads, log); 157 } 158 159 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 160 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) { 161 162 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log); 163 this.jobName = jobName; 164 if (jobName != null) { 165 listeners.add(new JobHistoryListener(jobName)); 166 } 167 } 168 169 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 170 Integer nbThreads, String jobName, ImporterLogger log) { 171 this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log); 172 } 173 174 public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) { 175 this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation, 176 configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log); 177 repositoryName = configuration.repositoryName; 178 } 179 180 public void addFilter(ImporterFilter filter) { 181 log.debug(String.format( 182 "Filter with %s, was added on the importer with the hash code %s. The source node name is %s", 183 filter.toString(), this.hashCode(), importSource.getName())); 184 filters.add(filter); 185 } 186 187 public void addListeners(ImporterListener... listeners) { 188 addListeners(Arrays.asList(listeners)); 189 } 190 191 public void addListeners(Collection<ImporterListener> listeners) { 192 this.listeners.addAll(listeners); 193 } 194 195 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 196 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 197 } 198 199 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 200 this.importingDocumentFilters.addAll(importingDocumentFilters); 201 } 202 203 @Override 204 public void run() { 205 Exception finalException = null; 206 if (!TransactionHelper.isTransactionActive()) { 207 TransactionHelper.startTransaction(); 208 } 209 try (CloseableCoreSession closeableCoreSession = CoreInstance.openCoreSessionSystem(repositoryName)) { 210 session = closeableCoreSession; 211 for (ImporterFilter filter : filters) { 212 log.debug(String.format( 213 "Running filter with %s, on the importer with the hash code %s. The source node name is %s", 214 filter.toString(), this.hashCode(), importSource.getName())); 215 filter.handleBeforeImport(); 216 } 217 if (filters.size() == 0) { 218 log.debug(String.format( 219 "No filters are registered on the importer with hash code %s, while importing the source node with name %s", 220 this.hashCode(), importSource.getName())); 221 } 222 doRun(); 223 } catch (Exception e) { // deals with interrupt below 224 ExceptionUtils.checkInterrupt(e); 225 log.error("Task exec failed", e); 226 finalException = e; 227 } finally { 228 for (ImporterFilter filter : filters) { 229 filter.handleAfterImport(finalException); 230 } 231 session = null; 232 } 233 } 234 235 public void setRootImportTask(GenericThreadedImportTask rootImportTask) { 236 this.rootImportTask = rootImportTask; 237 } 238 239 protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer, 240 boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) { 241 if (rootImportTask == null) { 242 setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer, 243 skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName)); 244 } else { 245 rootImportTask.setInputSource(importSource); 246 rootImportTask.setTargetFolder(targetContainer); 247 rootImportTask.setSkipContainerCreation(skipRootContainerCreation); 248 rootImportTask.setRsLogger(log); 249 rootImportTask.setFactory(getFactory()); 250 rootImportTask.setThreadPolicy(getThreadPolicy()); 251 rootImportTask.setJobName(jobName); 252 rootImportTask.setBatchSize(batchSize); 253 } 254 rootImportTask.addListeners(listeners); 255 rootImportTask.addImportingDocumentFilters(importingDocumentFilters); 256 rootImportTask.setTransactionTimeout(transactionTimeout); 257 return rootImportTask; 258 } 259 260 /** 261 * Creates non-daemon threads at normal priority. 262 */ 263 public static class NamedThreadFactory implements ThreadFactory { 264 265 private final AtomicInteger threadNumber = new AtomicInteger(); 266 267 private final ThreadGroup group; 268 269 private final String prefix; 270 271 public NamedThreadFactory(String prefix) { 272 SecurityManager sm = System.getSecurityManager(); 273 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 274 this.prefix = prefix; 275 } 276 277 @Override 278 public Thread newThread(Runnable r) { 279 String name = prefix + threadNumber.incrementAndGet(); 280 Thread thread = new Thread(group, r, name); 281 // do not set daemon 282 thread.setPriority(Thread.NORM_PRIORITY); 283 return thread; 284 } 285 } 286 287 protected void doRun() throws IOException { 288 289 targetContainer = getTargetContainer(); 290 291 nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 292 293 importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS, 294 new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-")); 295 296 initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName); 297 298 rootImportTask.setRootTask(); 299 long t0 = System.currentTimeMillis(); 300 301 notifyBeforeImport(); 302 303 importTP.execute(rootImportTask); 304 sleep(200); 305 int activeTasks = importTP.getActiveCount(); 306 int oldActiveTasks = 0; 307 long lastLogProgressTime = System.currentTimeMillis(); 308 long lastCreatedDocCounter = 0; 309 310 PerfLogger perfLogger = enablePerfLogging ? new PerfLogger(PERF_HEADERS) : null; 311 while (activeTasks > 0) { 312 sleep(500); 313 activeTasks = importTP.getActiveCount(); 314 boolean logProgress = false; 315 if (oldActiveTasks != activeTasks) { 316 oldActiveTasks = activeTasks; 317 log.debug("currently " + activeTasks + " active import Threads"); 318 logProgress = true; 319 320 } 321 long ti = System.currentTimeMillis(); 322 if (ti - lastLogProgressTime > 5000) { 323 logProgress = true; 324 } 325 if (logProgress) { 326 long inbCreatedDocs = getCreatedDocsCounter(); 327 long deltaT = ti - lastLogProgressTime; 328 double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0)); 329 double imediateSpeed = averageSpeed; 330 if (deltaT > 0) { 331 imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT)); 332 } 333 log.info(inbCreatedDocs + " docs created"); 334 log.info("average speed = " + averageSpeed + " docs/s"); 335 log.info("immediate speed = " + imediateSpeed + " docs/s"); 336 337 if (enablePerfLogging) { 338 Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed }; 339 perfLogger.log(perfData); 340 } 341 342 lastLogProgressTime = ti; 343 lastCreatedDocCounter = inbCreatedDocs; 344 } 345 } 346 stopImportProcrocess(); 347 log.info("All Threads terminated"); 348 if (enablePerfLogging) { 349 perfLogger.release(); 350 } 351 notifyAfterImport(); 352 353 long t1 = System.currentTimeMillis(); 354 long nbCreatedDocs = getCreatedDocsCounter(); 355 log.info(nbCreatedDocs + " docs created"); 356 log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s"); 357 for (String k : nbCreatedDocsByThreads.keySet()) { 358 log.info(k + " --> " + nbCreatedDocsByThreads.get(k)); 359 } 360 Stopwatch stopwatch; 361 for (String name : SimonManager.simonNames()) { 362 if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) { 363 continue; 364 } 365 stopwatch = SimonManager.getStopwatch(name); 366 if (stopwatch.getCounter() > 0) { 367 log.info(stopwatch.toString()); 368 } 369 } 370 371 } 372 373 protected static void sleep(int millis) { 374 try { 375 Thread.sleep(millis); 376 } catch (InterruptedException e) { 377 Thread.currentThread().interrupt(); 378 throw new NuxeoException(e); 379 } 380 } 381 382 protected DocumentModel getTargetContainer() { 383 if (targetContainer == null) { 384 targetContainer = createTargetContainer(); 385 } 386 return targetContainer; 387 } 388 389 /** 390 * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be 391 * overridden in subclasses. 392 * 393 * @return 394 */ 395 protected DocumentModel createTargetContainer() { 396 try { 397 return session.getDocument(new PathRef(importWritePath)); 398 } catch (DocumentNotFoundException e) { 399 log.error(e.getMessage()); 400 throw e; 401 } 402 } 403 404 public ImporterThreadingPolicy getThreadPolicy() { 405 if (threadPolicy == null) { 406 threadPolicy = new DefaultMultiThreadingPolicy(); 407 } 408 return threadPolicy; 409 } 410 411 public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 412 this.threadPolicy = threadPolicy; 413 } 414 415 public ImporterDocumentModelFactory getFactory() { 416 if (factory == null) { 417 factory = new DefaultDocumentModelFactory(); 418 } 419 return factory; 420 } 421 422 public void setFactory(ImporterDocumentModelFactory factory) { 423 this.factory = factory; 424 } 425 426 /** 427 * @since 5.9.4 428 */ 429 public void setTransactionTimeout(int transactionTimeout) { 430 this.transactionTimeout = transactionTimeout; 431 } 432 433 public void setEnablePerfLogging(boolean enablePerfLogging) { 434 this.enablePerfLogging = enablePerfLogging; 435 } 436 437 public void stopImportProcrocess() { 438 if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) { 439 importTP.shutdownNow(); 440 } 441 } 442 443 protected void notifyBeforeImport() { 444 for (ImporterListener listener : listeners) { 445 listener.beforeImport(); 446 } 447 } 448 449 protected void notifyAfterImport() { 450 for (ImporterListener listener : listeners) { 451 listener.afterImport(); 452 } 453 } 454 455 /** 456 * @since 7.1 457 */ 458 public String getRepositoryName() { 459 return repositoryName; 460 } 461 462 /** 463 * @since 7.1 464 */ 465 public void setRepositoryName(String repositoryName) { 466 this.repositoryName = repositoryName; 467 } 468 469}