001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 * 019 * $Id$ 020 */ 021 022package org.nuxeo.ecm.platform.importer.base; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.LinkedBlockingQueue; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036 037import org.javasimon.SimonManager; 038import org.javasimon.Stopwatch; 039import org.nuxeo.common.utils.ExceptionUtils; 040import org.nuxeo.ecm.core.api.CoreInstance; 041import org.nuxeo.ecm.core.api.CoreSession; 042import org.nuxeo.ecm.core.api.DocumentModel; 043import org.nuxeo.ecm.core.api.DocumentNotFoundException; 044import org.nuxeo.ecm.core.api.NuxeoException; 045import org.nuxeo.ecm.core.api.PathRef; 046import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory; 047import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 048import org.nuxeo.ecm.platform.importer.filter.ImporterFilter; 049import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 050import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 051import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener; 052import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 053import org.nuxeo.ecm.platform.importer.log.PerfLogger; 054import org.nuxeo.ecm.platform.importer.source.SourceNode; 055import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy; 056import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 057 058/** 059 * Generic importer 060 * 061 * @author Thierry Delprat 062 */ 063public class GenericMultiThreadedImporter implements ImporterRunner { 064 065 protected static ThreadPoolExecutor importTP; 066 067 protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 068 069 protected ImporterThreadingPolicy threadPolicy; 070 071 protected ImporterDocumentModelFactory factory; 072 073 protected SourceNode importSource; 074 075 protected DocumentModel targetContainer; 076 077 protected Integer batchSize = 50; 078 079 protected Integer nbThreads = 5; 080 081 protected Integer transactionTimeout = 0; 082 083 protected ImporterLogger log; 084 085 protected CoreSession session; 086 087 protected String importWritePath; 088 089 protected Boolean skipRootContainerCreation = false; 090 091 protected String jobName; 092 093 protected boolean enablePerfLogging = true; 094 095 protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>(); 096 097 protected List<ImporterListener> listeners = new ArrayList<ImporterListener>(); 098 099 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>(); 100 101 protected GenericThreadedImportTask rootImportTask; 102 103 protected final static int DEFAULT_QUEUE_SIZE = 10000; 104 105 protected int queueSize = DEFAULT_QUEUE_SIZE; 106 107 protected String repositoryName; 108 109 public static ThreadPoolExecutor getExecutor() { 110 return importTP; 111 } 112 113 public static synchronized void addCreatedDoc(String taskId, long nbDocs) { 114 String tid = Thread.currentThread().getName(); 115 nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs); 116 } 117 118 public static synchronized long getCreatedDocsCounter() { 119 long counter = 0; 120 for (String tid : nbCreatedDocsByThreads.keySet()) { 121 Long tCounter = nbCreatedDocsByThreads.get(tid); 122 if (tCounter != null) { 123 counter += tCounter; 124 } 125 } 126 return counter; 127 } 128 129 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 130 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) { 131 importSource = sourceNode; 132 this.importWritePath = importWritePath; 133 this.log = log; 134 if (batchSize != null) { 135 this.batchSize = batchSize; 136 } 137 if (nbThreads != null) { 138 this.nbThreads = nbThreads; 139 } 140 if (skipRootContainerCreation != null) { 141 this.skipRootContainerCreation = skipRootContainerCreation; 142 } 143 } 144 145 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 146 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) { 147 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE); 148 } 149 150 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 151 Integer nbThreads, ImporterLogger log) { 152 this(sourceNode, importWritePath, false, batchSize, nbThreads, log); 153 } 154 155 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 156 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) { 157 158 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log); 159 this.jobName = jobName; 160 if (jobName != null) { 161 listeners.add(new JobHistoryListener(jobName)); 162 } 163 } 164 165 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 166 Integer nbThreads, String jobName, ImporterLogger log) { 167 this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log); 168 } 169 170 public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) { 171 this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation, 172 configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log); 173 repositoryName = configuration.repositoryName; 174 } 175 176 public void addFilter(ImporterFilter filter) { 177 log.debug(String.format( 178 "Filter with %s, was added on the importer with the hash code %s. The source node name is %s", 179 filter.toString(), this.hashCode(), importSource.getName())); 180 filters.add(filter); 181 } 182 183 public void addListeners(ImporterListener... listeners) { 184 addListeners(Arrays.asList(listeners)); 185 } 186 187 public void addListeners(Collection<ImporterListener> listeners) { 188 this.listeners.addAll(listeners); 189 } 190 191 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 192 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 193 } 194 195 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 196 this.importingDocumentFilters.addAll(importingDocumentFilters); 197 } 198 199 @Override 200 public void run() { 201 Exception finalException = null; 202 try { 203 session = CoreInstance.openCoreSessionSystem(repositoryName); 204 for (ImporterFilter filter : filters) { 205 log.debug(String.format( 206 "Running filter with %s, on the importer with the hash code %s. The source node name is %s", 207 filter.toString(), this.hashCode(), importSource.getName())); 208 filter.handleBeforeImport(); 209 } 210 if (filters.size() == 0) { 211 log.debug(String.format( 212 "No filters are registered on the importer with hash code %s, while importing the source node with name %s", 213 this.hashCode(), importSource.getName())); 214 } 215 doRun(); 216 } catch (Exception e) { // deals with interrupt below 217 ExceptionUtils.checkInterrupt(e); 218 log.error("Task exec failed", e); 219 finalException = e; 220 } finally { 221 for (ImporterFilter filter : filters) { 222 filter.handleAfterImport(finalException); 223 } 224 if (session != null) { 225 session.close(); 226 session = null; 227 } 228 } 229 } 230 231 public void setRootImportTask(GenericThreadedImportTask rootImportTask) { 232 this.rootImportTask = rootImportTask; 233 } 234 235 protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer, 236 boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) { 237 if (rootImportTask == null) { 238 setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer, 239 skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName)); 240 } else { 241 rootImportTask.setInputSource(importSource); 242 rootImportTask.setTargetFolder(targetContainer); 243 rootImportTask.setSkipContainerCreation(skipRootContainerCreation); 244 rootImportTask.setRsLogger(log); 245 rootImportTask.setFactory(getFactory()); 246 rootImportTask.setThreadPolicy(getThreadPolicy()); 247 rootImportTask.setJobName(jobName); 248 rootImportTask.setBatchSize(batchSize); 249 } 250 rootImportTask.addListeners(listeners); 251 rootImportTask.addImportingDocumentFilters(importingDocumentFilters); 252 rootImportTask.setTransactionTimeout(transactionTimeout); 253 return rootImportTask; 254 } 255 256 /** 257 * Creates non-daemon threads at normal priority. 258 */ 259 public static class NamedThreadFactory implements ThreadFactory { 260 261 private final AtomicInteger threadNumber = new AtomicInteger(); 262 263 private final ThreadGroup group; 264 265 private final String prefix; 266 267 public NamedThreadFactory(String prefix) { 268 SecurityManager sm = System.getSecurityManager(); 269 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 270 this.prefix = prefix; 271 } 272 273 @Override 274 public Thread newThread(Runnable r) { 275 String name = prefix + threadNumber.incrementAndGet(); 276 Thread thread = new Thread(group, r, name); 277 // do not set daemon 278 thread.setPriority(Thread.NORM_PRIORITY); 279 return thread; 280 } 281 } 282 283 protected void doRun() throws IOException { 284 285 targetContainer = getTargetContainer(); 286 287 nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>(); 288 289 importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS, 290 new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-")); 291 292 initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName); 293 294 rootImportTask.setRootTask(); 295 long t0 = System.currentTimeMillis(); 296 297 notifyBeforeImport(); 298 299 importTP.execute(rootImportTask); 300 sleep(200); 301 int activeTasks = importTP.getActiveCount(); 302 int oldActiveTasks = 0; 303 long lastLogProgressTime = System.currentTimeMillis(); 304 long lastCreatedDocCounter = 0; 305 306 String[] headers = { "nbDocs", "average", "imediate" }; 307 PerfLogger perfLogger = new PerfLogger(headers); 308 while (activeTasks > 0) { 309 sleep(500); 310 activeTasks = importTP.getActiveCount(); 311 boolean logProgress = false; 312 if (oldActiveTasks != activeTasks) { 313 oldActiveTasks = activeTasks; 314 log.debug("currently " + activeTasks + " active import Threads"); 315 logProgress = true; 316 317 } 318 long ti = System.currentTimeMillis(); 319 if (ti - lastLogProgressTime > 5000) { 320 logProgress = true; 321 } 322 if (logProgress) { 323 long inbCreatedDocs = getCreatedDocsCounter(); 324 long deltaT = ti - lastLogProgressTime; 325 double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0)); 326 double imediateSpeed = averageSpeed; 327 if (deltaT > 0) { 328 imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT)); 329 } 330 log.info(inbCreatedDocs + " docs created"); 331 log.info("average speed = " + averageSpeed + " docs/s"); 332 log.info("immediate speed = " + imediateSpeed + " docs/s"); 333 334 if (enablePerfLogging) { 335 Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed }; 336 perfLogger.log(perfData); 337 } 338 339 lastLogProgressTime = ti; 340 lastCreatedDocCounter = inbCreatedDocs; 341 } 342 } 343 stopImportProcrocess(); 344 log.info("All Threads terminated"); 345 perfLogger.release(); 346 notifyAfterImport(); 347 348 long t1 = System.currentTimeMillis(); 349 long nbCreatedDocs = getCreatedDocsCounter(); 350 log.info(nbCreatedDocs + " docs created"); 351 log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s"); 352 for (String k : nbCreatedDocsByThreads.keySet()) { 353 log.info(k + " --> " + nbCreatedDocsByThreads.get(k)); 354 } 355 Stopwatch stopwatch; 356 for (String name : SimonManager.simonNames()) { 357 if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) { 358 continue; 359 } 360 stopwatch = SimonManager.getStopwatch(name); 361 if (stopwatch.getCounter() > 0) { 362 log.info(stopwatch.toString()); 363 } 364 } 365 366 } 367 368 protected static void sleep(int millis) { 369 try { 370 Thread.sleep(millis); 371 } catch (InterruptedException e) { 372 // restore interrupt status 373 Thread.currentThread().interrupt(); 374 throw new NuxeoException(e); 375 } 376 } 377 378 protected DocumentModel getTargetContainer() { 379 if (targetContainer == null) { 380 targetContainer = createTargetContainer(); 381 } 382 return targetContainer; 383 } 384 385 /** 386 * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be 387 * overridden in subclasses. 388 * 389 * @return 390 */ 391 protected DocumentModel createTargetContainer() { 392 try { 393 return session.getDocument(new PathRef(importWritePath)); 394 } catch (DocumentNotFoundException e) { 395 log.error(e.getMessage()); 396 throw e; 397 } 398 } 399 400 public ImporterThreadingPolicy getThreadPolicy() { 401 if (threadPolicy == null) { 402 threadPolicy = new DefaultMultiThreadingPolicy(); 403 } 404 return threadPolicy; 405 } 406 407 public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 408 this.threadPolicy = threadPolicy; 409 } 410 411 public ImporterDocumentModelFactory getFactory() { 412 if (factory == null) { 413 factory = new DefaultDocumentModelFactory(); 414 } 415 return factory; 416 } 417 418 public void setFactory(ImporterDocumentModelFactory factory) { 419 this.factory = factory; 420 } 421 422 /** 423 * @since 5.9.4 424 */ 425 public void setTransactionTimeout(int transactionTimeout) { 426 this.transactionTimeout = transactionTimeout; 427 } 428 429 public void setEnablePerfLogging(boolean enablePerfLogging) { 430 this.enablePerfLogging = enablePerfLogging; 431 } 432 433 public void stopImportProcrocess() { 434 if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) { 435 importTP.shutdownNow(); 436 } 437 } 438 439 protected void notifyBeforeImport() { 440 for (ImporterListener listener : listeners) { 441 listener.beforeImport(); 442 } 443 } 444 445 protected void notifyAfterImport() { 446 for (ImporterListener listener : listeners) { 447 listener.afterImport(); 448 } 449 } 450 451 /** 452 * @since 7.1 453 */ 454 public String getRepositoryName() { 455 return repositoryName; 456 } 457 458 /** 459 * @since 7.1 460 */ 461 public void setRepositoryName(String repositoryName) { 462 this.repositoryName = repositoryName; 463 } 464 465}