001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 * 019 * $Id$ 020 */ 021 022package org.nuxeo.ecm.platform.importer.base; 023 024import org.javasimon.SimonManager; 025import org.javasimon.Stopwatch; 026import org.nuxeo.common.utils.ExceptionUtils; 027import org.nuxeo.ecm.core.api.CoreInstance; 028import org.nuxeo.ecm.core.api.CoreSession; 029import org.nuxeo.ecm.core.api.DocumentModel; 030import org.nuxeo.ecm.core.api.DocumentNotFoundException; 031import org.nuxeo.ecm.core.api.NuxeoException; 032import org.nuxeo.ecm.core.api.PathRef; 033import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory; 034import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 035import org.nuxeo.ecm.platform.importer.filter.ImporterFilter; 036import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 037import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 038import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener; 039import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 040import org.nuxeo.ecm.platform.importer.log.PerfLogger; 041import org.nuxeo.ecm.platform.importer.source.SourceNode; 042import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy; 043import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 044import org.nuxeo.runtime.transaction.TransactionHelper; 045 046import java.io.IOException; 047import java.util.ArrayList; 048import java.util.Arrays; 049import java.util.Collection; 050import java.util.List; 051import java.util.Map; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.LinkedBlockingQueue; 054import java.util.concurrent.ThreadFactory; 055import java.util.concurrent.ThreadPoolExecutor; 056import java.util.concurrent.TimeUnit; 057import java.util.concurrent.atomic.AtomicInteger; 058 059/** 060 * Generic importer 061 * 062 * @author Thierry Delprat 063 */ 064public class GenericMultiThreadedImporter implements ImporterRunner { 065 066 protected static ThreadPoolExecutor importTP; 067 068 protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 069 070 protected ImporterThreadingPolicy threadPolicy; 071 072 protected ImporterDocumentModelFactory factory; 073 074 protected SourceNode importSource; 075 076 protected DocumentModel targetContainer; 077 078 protected Integer batchSize = 50; 079 080 protected Integer nbThreads = 5; 081 082 protected Integer transactionTimeout = 0; 083 084 protected ImporterLogger log; 085 086 protected CoreSession session; 087 088 protected String importWritePath; 089 090 protected Boolean skipRootContainerCreation = false; 091 092 protected String jobName; 093 094 protected boolean enablePerfLogging = true; 095 096 protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>(); 097 098 protected List<ImporterListener> listeners = new ArrayList<ImporterListener>(); 099 100 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>(); 101 102 protected GenericThreadedImportTask rootImportTask; 103 104 protected final static int DEFAULT_QUEUE_SIZE = 10000; 105 106 protected int queueSize = DEFAULT_QUEUE_SIZE; 107 108 protected String repositoryName; 109 110 public static ThreadPoolExecutor getExecutor() { 111 return importTP; 112 } 113 114 public static synchronized void addCreatedDoc(String taskId, long nbDocs) { 115 String tid = Thread.currentThread().getName(); 116 nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs); 117 } 118 119 public static synchronized long getCreatedDocsCounter() { 120 long counter = 0; 121 for (String tid : nbCreatedDocsByThreads.keySet()) { 122 Long tCounter = nbCreatedDocsByThreads.get(tid); 123 if (tCounter != null) { 124 counter += tCounter; 125 } 126 } 127 return counter; 128 } 129 130 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 131 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) { 132 importSource = sourceNode; 133 this.importWritePath = importWritePath; 134 this.log = log; 135 if (batchSize != null) { 136 this.batchSize = batchSize; 137 } 138 if (nbThreads != null) { 139 this.nbThreads = nbThreads; 140 } 141 if (skipRootContainerCreation != null) { 142 this.skipRootContainerCreation = skipRootContainerCreation; 143 } 144 } 145 146 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 147 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) { 148 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE); 149 } 150 151 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 152 Integer nbThreads, ImporterLogger log) { 153 this(sourceNode, importWritePath, false, batchSize, nbThreads, log); 154 } 155 156 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 157 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) { 158 159 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log); 160 this.jobName = jobName; 161 if (jobName != null) { 162 listeners.add(new JobHistoryListener(jobName)); 163 } 164 } 165 166 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 167 Integer nbThreads, String jobName, ImporterLogger log) { 168 this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log); 169 } 170 171 public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) { 172 this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation, 173 configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log); 174 repositoryName = configuration.repositoryName; 175 } 176 177 public void addFilter(ImporterFilter filter) { 178 log.debug(String.format( 179 "Filter with %s, was added on the importer with the hash code %s. The source node name is %s", 180 filter.toString(), this.hashCode(), importSource.getName())); 181 filters.add(filter); 182 } 183 184 public void addListeners(ImporterListener... listeners) { 185 addListeners(Arrays.asList(listeners)); 186 } 187 188 public void addListeners(Collection<ImporterListener> listeners) { 189 this.listeners.addAll(listeners); 190 } 191 192 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 193 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 194 } 195 196 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 197 this.importingDocumentFilters.addAll(importingDocumentFilters); 198 } 199 200 @Override 201 public void run() { 202 Exception finalException = null; 203 try { 204 if (!TransactionHelper.isTransactionActive()) { 205 TransactionHelper.startTransaction(); 206 } 207 session = CoreInstance.openCoreSessionSystem(repositoryName); 208 for (ImporterFilter filter : filters) { 209 log.debug(String.format( 210 "Running filter with %s, on the importer with the hash code %s. The source node name is %s", 211 filter.toString(), this.hashCode(), importSource.getName())); 212 filter.handleBeforeImport(); 213 } 214 if (filters.size() == 0) { 215 log.debug(String.format( 216 "No filters are registered on the importer with hash code %s, while importing the source node with name %s", 217 this.hashCode(), importSource.getName())); 218 } 219 doRun(); 220 } catch (Exception e) { // deals with interrupt below 221 ExceptionUtils.checkInterrupt(e); 222 log.error("Task exec failed", e); 223 finalException = e; 224 } finally { 225 for (ImporterFilter filter : filters) { 226 filter.handleAfterImport(finalException); 227 } 228 if (session != null) { 229 session.close(); 230 session = null; 231 } 232 } 233 } 234 235 public void setRootImportTask(GenericThreadedImportTask rootImportTask) { 236 this.rootImportTask = rootImportTask; 237 } 238 239 protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer, 240 boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) { 241 if (rootImportTask == null) { 242 setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer, 243 skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName)); 244 } else { 245 rootImportTask.setInputSource(importSource); 246 rootImportTask.setTargetFolder(targetContainer); 247 rootImportTask.setSkipContainerCreation(skipRootContainerCreation); 248 rootImportTask.setRsLogger(log); 249 rootImportTask.setFactory(getFactory()); 250 rootImportTask.setThreadPolicy(getThreadPolicy()); 251 rootImportTask.setJobName(jobName); 252 rootImportTask.setBatchSize(batchSize); 253 } 254 rootImportTask.addListeners(listeners); 255 rootImportTask.addImportingDocumentFilters(importingDocumentFilters); 256 rootImportTask.setTransactionTimeout(transactionTimeout); 257 return rootImportTask; 258 } 259 260 /** 261 * Creates non-daemon threads at normal priority. 262 */ 263 public static class NamedThreadFactory implements ThreadFactory { 264 265 private final AtomicInteger threadNumber = new AtomicInteger(); 266 267 private final ThreadGroup group; 268 269 private final String prefix; 270 271 public NamedThreadFactory(String prefix) { 272 SecurityManager sm = System.getSecurityManager(); 273 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 274 this.prefix = prefix; 275 } 276 277 @Override 278 public Thread newThread(Runnable r) { 279 String name = prefix + threadNumber.incrementAndGet(); 280 Thread thread = new Thread(group, r, name); 281 // do not set daemon 282 thread.setPriority(Thread.NORM_PRIORITY); 283 return thread; 284 } 285 } 286 287 protected void doRun() throws IOException { 288 289 targetContainer = getTargetContainer(); 290 291 nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 292 293 importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS, 294 new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-")); 295 296 initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName); 297 298 rootImportTask.setRootTask(); 299 long t0 = System.currentTimeMillis(); 300 301 notifyBeforeImport(); 302 303 importTP.execute(rootImportTask); 304 sleep(200); 305 int activeTasks = importTP.getActiveCount(); 306 int oldActiveTasks = 0; 307 long lastLogProgressTime = System.currentTimeMillis(); 308 long lastCreatedDocCounter = 0; 309 310 String[] headers = { "nbDocs", "average", "imediate" }; 311 PerfLogger perfLogger = new PerfLogger(headers); 312 while (activeTasks > 0) { 313 sleep(500); 314 activeTasks = importTP.getActiveCount(); 315 boolean logProgress = false; 316 if (oldActiveTasks != activeTasks) { 317 oldActiveTasks = activeTasks; 318 log.debug("currently " + activeTasks + " active import Threads"); 319 logProgress = true; 320 321 } 322 long ti = System.currentTimeMillis(); 323 if (ti - lastLogProgressTime > 5000) { 324 logProgress = true; 325 } 326 if (logProgress) { 327 long inbCreatedDocs = getCreatedDocsCounter(); 328 long deltaT = ti - lastLogProgressTime; 329 double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0)); 330 double imediateSpeed = averageSpeed; 331 if (deltaT > 0) { 332 imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT)); 333 } 334 log.info(inbCreatedDocs + " docs created"); 335 log.info("average speed = " + averageSpeed + " docs/s"); 336 log.info("immediate speed = " + imediateSpeed + " docs/s"); 337 338 if (enablePerfLogging) { 339 Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed }; 340 perfLogger.log(perfData); 341 } 342 343 lastLogProgressTime = ti; 344 lastCreatedDocCounter = inbCreatedDocs; 345 } 346 } 347 stopImportProcrocess(); 348 log.info("All Threads terminated"); 349 perfLogger.release(); 350 notifyAfterImport(); 351 352 long t1 = System.currentTimeMillis(); 353 long nbCreatedDocs = getCreatedDocsCounter(); 354 log.info(nbCreatedDocs + " docs created"); 355 log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s"); 356 for (String k : nbCreatedDocsByThreads.keySet()) { 357 log.info(k + " --> " + nbCreatedDocsByThreads.get(k)); 358 } 359 Stopwatch stopwatch; 360 for (String name : SimonManager.simonNames()) { 361 if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) { 362 continue; 363 } 364 stopwatch = SimonManager.getStopwatch(name); 365 if (stopwatch.getCounter() > 0) { 366 log.info(stopwatch.toString()); 367 } 368 } 369 370 } 371 372 protected static void sleep(int millis) { 373 try { 374 Thread.sleep(millis); 375 } catch (InterruptedException e) { 376 // restore interrupt status 377 Thread.currentThread().interrupt(); 378 throw new NuxeoException(e); 379 } 380 } 381 382 protected DocumentModel getTargetContainer() { 383 if (targetContainer == null) { 384 targetContainer = createTargetContainer(); 385 } 386 return targetContainer; 387 } 388 389 /** 390 * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be 391 * overridden in subclasses. 392 * 393 * @return 394 */ 395 protected DocumentModel createTargetContainer() { 396 try { 397 return session.getDocument(new PathRef(importWritePath)); 398 } catch (DocumentNotFoundException e) { 399 log.error(e.getMessage()); 400 throw e; 401 } 402 } 403 404 public ImporterThreadingPolicy getThreadPolicy() { 405 if (threadPolicy == null) { 406 threadPolicy = new DefaultMultiThreadingPolicy(); 407 } 408 return threadPolicy; 409 } 410 411 public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 412 this.threadPolicy = threadPolicy; 413 } 414 415 public ImporterDocumentModelFactory getFactory() { 416 if (factory == null) { 417 factory = new DefaultDocumentModelFactory(); 418 } 419 return factory; 420 } 421 422 public void setFactory(ImporterDocumentModelFactory factory) { 423 this.factory = factory; 424 } 425 426 /** 427 * @since 5.9.4 428 */ 429 public void setTransactionTimeout(int transactionTimeout) { 430 this.transactionTimeout = transactionTimeout; 431 } 432 433 public void setEnablePerfLogging(boolean enablePerfLogging) { 434 this.enablePerfLogging = enablePerfLogging; 435 } 436 437 public void stopImportProcrocess() { 438 if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) { 439 importTP.shutdownNow(); 440 } 441 } 442 443 protected void notifyBeforeImport() { 444 for (ImporterListener listener : listeners) { 445 listener.beforeImport(); 446 } 447 } 448 449 protected void notifyAfterImport() { 450 for (ImporterListener listener : listeners) { 451 listener.afterImport(); 452 } 453 } 454 455 /** 456 * @since 7.1 457 */ 458 public String getRepositoryName() { 459 return repositoryName; 460 } 461 462 /** 463 * @since 7.1 464 */ 465 public void setRepositoryName(String repositoryName) { 466 this.repositoryName = repositoryName; 467 } 468 469}