001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 * 019 * $Id$ 020 */ 021 022package org.nuxeo.ecm.platform.importer.base; 023 024import org.javasimon.SimonManager; 025import org.javasimon.Stopwatch; 026import org.nuxeo.common.utils.ExceptionUtils; 027import org.nuxeo.ecm.core.api.CoreInstance; 028import org.nuxeo.ecm.core.api.CoreSession; 029import org.nuxeo.ecm.core.api.DocumentModel; 030import org.nuxeo.ecm.core.api.DocumentNotFoundException; 031import org.nuxeo.ecm.core.api.NuxeoException; 032import org.nuxeo.ecm.core.api.PathRef; 033import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory; 034import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 035import org.nuxeo.ecm.platform.importer.filter.ImporterFilter; 036import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 037import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 038import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener; 039import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 040import org.nuxeo.ecm.platform.importer.log.PerfLogger; 041import org.nuxeo.ecm.platform.importer.source.SourceNode; 042import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy; 043import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 044import org.nuxeo.runtime.transaction.TransactionHelper; 045 046import java.io.IOException; 047import java.util.ArrayList; 048import java.util.Arrays; 049import java.util.Collection; 050import java.util.List; 051import java.util.Map; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.LinkedBlockingQueue; 054import java.util.concurrent.ThreadFactory; 055import java.util.concurrent.ThreadPoolExecutor; 056import java.util.concurrent.TimeUnit; 057import java.util.concurrent.atomic.AtomicInteger; 058 059/** 060 * Generic importer 061 * 062 * @author Thierry Delprat 063 */ 064public class GenericMultiThreadedImporter implements ImporterRunner { 065 066 protected static ThreadPoolExecutor importTP; 067 068 protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 069 070 protected ImporterThreadingPolicy threadPolicy; 071 072 protected ImporterDocumentModelFactory factory; 073 074 protected SourceNode importSource; 075 076 protected DocumentModel targetContainer; 077 078 protected Integer batchSize = 50; 079 080 protected Integer nbThreads = 5; 081 082 protected Integer transactionTimeout = 0; 083 084 protected ImporterLogger log; 085 086 protected CoreSession session; 087 088 protected String importWritePath; 089 090 protected Boolean skipRootContainerCreation = false; 091 092 protected String jobName; 093 094 protected boolean enablePerfLogging = true; 095 096 protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>(); 097 098 protected List<ImporterListener> listeners = new ArrayList<ImporterListener>(); 099 100 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>(); 101 102 protected GenericThreadedImportTask rootImportTask; 103 104 protected final static int DEFAULT_QUEUE_SIZE = 10000; 105 106 protected int queueSize = DEFAULT_QUEUE_SIZE; 107 108 protected String repositoryName; 109 110 protected static final String[] PERF_HEADERS = { "nbDocs", "average", "imediate" }; 111 112 public static ThreadPoolExecutor getExecutor() { 113 return importTP; 114 } 115 116 public static synchronized void addCreatedDoc(String taskId, long nbDocs) { 117 String tid = Thread.currentThread().getName(); 118 nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs); 119 } 120 121 public static synchronized long getCreatedDocsCounter() { 122 long counter = 0; 123 for (String tid : nbCreatedDocsByThreads.keySet()) { 124 Long tCounter = nbCreatedDocsByThreads.get(tid); 125 if (tCounter != null) { 126 counter += tCounter; 127 } 128 } 129 return counter; 130 } 131 132 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 133 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) { 134 importSource = sourceNode; 135 this.importWritePath = importWritePath; 136 this.log = log; 137 if (batchSize != null) { 138 this.batchSize = batchSize; 139 } 140 if (nbThreads != null) { 141 this.nbThreads = nbThreads; 142 } 143 if (skipRootContainerCreation != null) { 144 this.skipRootContainerCreation = skipRootContainerCreation; 145 } 146 } 147 148 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 149 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) { 150 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE); 151 } 152 153 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 154 Integer nbThreads, ImporterLogger log) { 155 this(sourceNode, importWritePath, false, batchSize, nbThreads, log); 156 } 157 158 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 159 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) { 160 161 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log); 162 this.jobName = jobName; 163 if (jobName != null) { 164 listeners.add(new JobHistoryListener(jobName)); 165 } 166 } 167 168 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 169 Integer nbThreads, String jobName, ImporterLogger log) { 170 this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log); 171 } 172 173 public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) { 174 this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation, 175 configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log); 176 repositoryName = configuration.repositoryName; 177 } 178 179 public void addFilter(ImporterFilter filter) { 180 log.debug(String.format( 181 "Filter with %s, was added on the importer with the hash code %s. The source node name is %s", 182 filter.toString(), this.hashCode(), importSource.getName())); 183 filters.add(filter); 184 } 185 186 public void addListeners(ImporterListener... listeners) { 187 addListeners(Arrays.asList(listeners)); 188 } 189 190 public void addListeners(Collection<ImporterListener> listeners) { 191 this.listeners.addAll(listeners); 192 } 193 194 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 195 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 196 } 197 198 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 199 this.importingDocumentFilters.addAll(importingDocumentFilters); 200 } 201 202 @Override 203 public void run() { 204 Exception finalException = null; 205 try { 206 if (!TransactionHelper.isTransactionActive()) { 207 TransactionHelper.startTransaction(); 208 } 209 session = CoreInstance.openCoreSessionSystem(repositoryName); 210 for (ImporterFilter filter : filters) { 211 log.debug(String.format( 212 "Running filter with %s, on the importer with the hash code %s. The source node name is %s", 213 filter.toString(), this.hashCode(), importSource.getName())); 214 filter.handleBeforeImport(); 215 } 216 if (filters.size() == 0) { 217 log.debug(String.format( 218 "No filters are registered on the importer with hash code %s, while importing the source node with name %s", 219 this.hashCode(), importSource.getName())); 220 } 221 doRun(); 222 } catch (Exception e) { // deals with interrupt below 223 ExceptionUtils.checkInterrupt(e); 224 log.error("Task exec failed", e); 225 finalException = e; 226 } finally { 227 for (ImporterFilter filter : filters) { 228 filter.handleAfterImport(finalException); 229 } 230 if (session != null) { 231 session.close(); 232 session = null; 233 } 234 } 235 } 236 237 public void setRootImportTask(GenericThreadedImportTask rootImportTask) { 238 this.rootImportTask = rootImportTask; 239 } 240 241 protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer, 242 boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) { 243 if (rootImportTask == null) { 244 setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer, 245 skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName)); 246 } else { 247 rootImportTask.setInputSource(importSource); 248 rootImportTask.setTargetFolder(targetContainer); 249 rootImportTask.setSkipContainerCreation(skipRootContainerCreation); 250 rootImportTask.setRsLogger(log); 251 rootImportTask.setFactory(getFactory()); 252 rootImportTask.setThreadPolicy(getThreadPolicy()); 253 rootImportTask.setJobName(jobName); 254 rootImportTask.setBatchSize(batchSize); 255 } 256 rootImportTask.addListeners(listeners); 257 rootImportTask.addImportingDocumentFilters(importingDocumentFilters); 258 rootImportTask.setTransactionTimeout(transactionTimeout); 259 return rootImportTask; 260 } 261 262 /** 263 * Creates non-daemon threads at normal priority. 264 */ 265 public static class NamedThreadFactory implements ThreadFactory { 266 267 private final AtomicInteger threadNumber = new AtomicInteger(); 268 269 private final ThreadGroup group; 270 271 private final String prefix; 272 273 public NamedThreadFactory(String prefix) { 274 SecurityManager sm = System.getSecurityManager(); 275 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 276 this.prefix = prefix; 277 } 278 279 @Override 280 public Thread newThread(Runnable r) { 281 String name = prefix + threadNumber.incrementAndGet(); 282 Thread thread = new Thread(group, r, name); 283 // do not set daemon 284 thread.setPriority(Thread.NORM_PRIORITY); 285 return thread; 286 } 287 } 288 289 protected void doRun() throws IOException { 290 291 targetContainer = getTargetContainer(); 292 293 nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 294 295 importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS, 296 new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-")); 297 298 initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName); 299 300 rootImportTask.setRootTask(); 301 long t0 = System.currentTimeMillis(); 302 303 notifyBeforeImport(); 304 305 importTP.execute(rootImportTask); 306 sleep(200); 307 int activeTasks = importTP.getActiveCount(); 308 int oldActiveTasks = 0; 309 long lastLogProgressTime = System.currentTimeMillis(); 310 long lastCreatedDocCounter = 0; 311 312 PerfLogger perfLogger = enablePerfLogging ? new PerfLogger(PERF_HEADERS) : null; 313 while (activeTasks > 0) { 314 sleep(500); 315 activeTasks = importTP.getActiveCount(); 316 boolean logProgress = false; 317 if (oldActiveTasks != activeTasks) { 318 oldActiveTasks = activeTasks; 319 log.debug("currently " + activeTasks + " active import Threads"); 320 logProgress = true; 321 322 } 323 long ti = System.currentTimeMillis(); 324 if (ti - lastLogProgressTime > 5000) { 325 logProgress = true; 326 } 327 if (logProgress) { 328 long inbCreatedDocs = getCreatedDocsCounter(); 329 long deltaT = ti - lastLogProgressTime; 330 double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0)); 331 double imediateSpeed = averageSpeed; 332 if (deltaT > 0) { 333 imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT)); 334 } 335 log.info(inbCreatedDocs + " docs created"); 336 log.info("average speed = " + averageSpeed + " docs/s"); 337 log.info("immediate speed = " + imediateSpeed + " docs/s"); 338 339 if (enablePerfLogging) { 340 Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed }; 341 perfLogger.log(perfData); 342 } 343 344 lastLogProgressTime = ti; 345 lastCreatedDocCounter = inbCreatedDocs; 346 } 347 } 348 stopImportProcrocess(); 349 log.info("All Threads terminated"); 350 if (enablePerfLogging) { 351 perfLogger.release(); 352 } 353 notifyAfterImport(); 354 355 long t1 = System.currentTimeMillis(); 356 long nbCreatedDocs = getCreatedDocsCounter(); 357 log.info(nbCreatedDocs + " docs created"); 358 log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s"); 359 for (String k : nbCreatedDocsByThreads.keySet()) { 360 log.info(k + " --> " + nbCreatedDocsByThreads.get(k)); 361 } 362 Stopwatch stopwatch; 363 for (String name : SimonManager.simonNames()) { 364 if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) { 365 continue; 366 } 367 stopwatch = SimonManager.getStopwatch(name); 368 if (stopwatch.getCounter() > 0) { 369 log.info(stopwatch.toString()); 370 } 371 } 372 373 } 374 375 protected static void sleep(int millis) { 376 try { 377 Thread.sleep(millis); 378 } catch (InterruptedException e) { 379 // restore interrupt status 380 Thread.currentThread().interrupt(); 381 throw new NuxeoException(e); 382 } 383 } 384 385 protected DocumentModel getTargetContainer() { 386 if (targetContainer == null) { 387 targetContainer = createTargetContainer(); 388 } 389 return targetContainer; 390 } 391 392 /** 393 * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be 394 * overridden in subclasses. 395 * 396 * @return 397 */ 398 protected DocumentModel createTargetContainer() { 399 try { 400 return session.getDocument(new PathRef(importWritePath)); 401 } catch (DocumentNotFoundException e) { 402 log.error(e.getMessage()); 403 throw e; 404 } 405 } 406 407 public ImporterThreadingPolicy getThreadPolicy() { 408 if (threadPolicy == null) { 409 threadPolicy = new DefaultMultiThreadingPolicy(); 410 } 411 return threadPolicy; 412 } 413 414 public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 415 this.threadPolicy = threadPolicy; 416 } 417 418 public ImporterDocumentModelFactory getFactory() { 419 if (factory == null) { 420 factory = new DefaultDocumentModelFactory(); 421 } 422 return factory; 423 } 424 425 public void setFactory(ImporterDocumentModelFactory factory) { 426 this.factory = factory; 427 } 428 429 /** 430 * @since 5.9.4 431 */ 432 public void setTransactionTimeout(int transactionTimeout) { 433 this.transactionTimeout = transactionTimeout; 434 } 435 436 public void setEnablePerfLogging(boolean enablePerfLogging) { 437 this.enablePerfLogging = enablePerfLogging; 438 } 439 440 public void stopImportProcrocess() { 441 if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) { 442 importTP.shutdownNow(); 443 } 444 } 445 446 protected void notifyBeforeImport() { 447 for (ImporterListener listener : listeners) { 448 listener.beforeImport(); 449 } 450 } 451 452 protected void notifyAfterImport() { 453 for (ImporterListener listener : listeners) { 454 listener.afterImport(); 455 } 456 } 457 458 /** 459 * @since 7.1 460 */ 461 public String getRepositoryName() { 462 return repositoryName; 463 } 464 465 /** 466 * @since 7.1 467 */ 468 public void setRepositoryName(String repositoryName) { 469 this.repositoryName = repositoryName; 470 } 471 472}