001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 * 019 * $Id$ 020 */ 021 022package org.nuxeo.ecm.platform.importer.base; 023 024import org.javasimon.SimonManager; 025import org.javasimon.Stopwatch; 026import org.nuxeo.common.utils.ExceptionUtils; 027import org.nuxeo.ecm.core.api.CoreInstance; 028import org.nuxeo.ecm.core.api.CoreSession; 029import org.nuxeo.ecm.core.api.DocumentModel; 030import org.nuxeo.ecm.core.api.DocumentNotFoundException; 031import org.nuxeo.ecm.core.api.NuxeoException; 032import org.nuxeo.ecm.core.api.PathRef; 033import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory; 034import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 035import org.nuxeo.ecm.platform.importer.filter.ImporterFilter; 036import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 037import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 038import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener; 039import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 040import org.nuxeo.ecm.platform.importer.log.PerfLogger; 041import org.nuxeo.ecm.platform.importer.source.SourceNode; 042import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy; 043import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 044import org.nuxeo.runtime.transaction.TransactionHelper; 045 046import java.io.IOException; 047import java.util.ArrayList; 048import java.util.Arrays; 049import java.util.Collection; 050import java.util.List; 051import java.util.Map; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.LinkedBlockingQueue; 054import java.util.concurrent.ThreadFactory; 055import java.util.concurrent.ThreadPoolExecutor; 056import java.util.concurrent.TimeUnit; 057import java.util.concurrent.atomic.AtomicInteger; 058 059/** 060 * Generic importer 061 * 062 * @author Thierry Delprat 063 */ 064public class GenericMultiThreadedImporter implements ImporterRunner { 065 066 protected static ThreadPoolExecutor importTP; 067 068 protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<>(); 069 070 protected ImporterThreadingPolicy threadPolicy; 071 072 protected ImporterDocumentModelFactory factory; 073 074 protected SourceNode importSource; 075 076 protected DocumentModel targetContainer; 077 078 protected Integer batchSize = 50; 079 080 protected Integer nbThreads = 5; 081 082 protected Integer transactionTimeout = 0; 083 084 protected ImporterLogger log; 085 086 protected CoreSession session; 087 088 protected String importWritePath; 089 090 protected Boolean skipRootContainerCreation = false; 091 092 protected String jobName; 093 094 protected boolean enablePerfLogging = true; 095 096 protected List<ImporterFilter> filters = new ArrayList<>(); 097 098 protected List<ImporterListener> listeners = new ArrayList<>(); 099 100 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<>(); 101 102 protected GenericThreadedImportTask rootImportTask; 103 104 protected final static int DEFAULT_QUEUE_SIZE = 10000; 105 106 protected int queueSize = DEFAULT_QUEUE_SIZE; 107 108 protected String repositoryName; 109 110 protected static final String[] PERF_HEADERS = { "nbDocs", "average", "imediate" }; 111 112 public static ThreadPoolExecutor getExecutor() { 113 return importTP; 114 } 115 116 public static void addCreatedDoc(String taskId, long nbDocs) { 117 String tid = Thread.currentThread().getName(); 118 nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs); 119 } 120 121 public static long getCreatedDocsCounter() { 122 long counter = 0; 123 for (String tid : nbCreatedDocsByThreads.keySet()) { 124 Long tCounter = nbCreatedDocsByThreads.get(tid); 125 if (tCounter != null) { 126 counter += tCounter; 127 } 128 } 129 return counter; 130 } 131 132 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 133 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) { 134 importSource = sourceNode; 135 this.importWritePath = importWritePath; 136 this.log = log; 137 if (batchSize != null) { 138 this.batchSize = batchSize; 139 } 140 if (nbThreads != null) { 141 this.nbThreads = nbThreads; 142 } 143 if (skipRootContainerCreation != null) { 144 this.skipRootContainerCreation = skipRootContainerCreation; 145 } 146 } 147 148 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 149 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) { 150 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE); 151 } 152 153 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 154 Integer nbThreads, ImporterLogger log) { 155 this(sourceNode, importWritePath, false, batchSize, nbThreads, log); 156 } 157 158 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, 159 Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) { 160 161 this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log); 162 this.jobName = jobName; 163 if (jobName != null) { 164 listeners.add(new JobHistoryListener(jobName)); 165 } 166 } 167 168 public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, 169 Integer nbThreads, String jobName, ImporterLogger log) { 170 this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log); 171 } 172 173 public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) { 174 this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation, 175 configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log); 176 repositoryName = configuration.repositoryName; 177 } 178 179 public void addFilter(ImporterFilter filter) { 180 log.debug(String.format( 181 "Filter with %s, was added on the importer with the hash code %s. The source node name is %s", 182 filter.toString(), this.hashCode(), importSource.getName())); 183 filters.add(filter); 184 } 185 186 public void addListeners(ImporterListener... listeners) { 187 addListeners(Arrays.asList(listeners)); 188 } 189 190 public void addListeners(Collection<ImporterListener> listeners) { 191 this.listeners.addAll(listeners); 192 } 193 194 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 195 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 196 } 197 198 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 199 this.importingDocumentFilters.addAll(importingDocumentFilters); 200 } 201 202 @Override 203 public void run() { 204 Exception finalException = null; 205 if (!TransactionHelper.isTransactionActive()) { 206 TransactionHelper.startTransaction(); 207 } 208 try { 209 session = CoreInstance.getCoreSessionSystem(repositoryName); 210 for (ImporterFilter filter : filters) { 211 log.debug(String.format( 212 "Running filter with %s, on the importer with the hash code %s. The source node name is %s", 213 filter.toString(), this.hashCode(), importSource.getName())); 214 filter.handleBeforeImport(); 215 } 216 if (filters.size() == 0) { 217 log.debug(String.format( 218 "No filters are registered on the importer with hash code %s, while importing the source node with name %s", 219 this.hashCode(), importSource.getName())); 220 } 221 doRun(); 222 } catch (Exception e) { // deals with interrupt below 223 ExceptionUtils.checkInterrupt(e); 224 log.error("Task exec failed", e); 225 finalException = e; 226 } finally { 227 for (ImporterFilter filter : filters) { 228 filter.handleAfterImport(finalException); 229 } 230 } 231 } 232 233 public void setRootImportTask(GenericThreadedImportTask rootImportTask) { 234 this.rootImportTask = rootImportTask; 235 } 236 237 protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer, 238 boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) { 239 if (rootImportTask == null) { 240 setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer, 241 skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName)); 242 } else { 243 rootImportTask.setInputSource(importSource); 244 rootImportTask.setTargetFolder(targetContainer); 245 rootImportTask.setSkipContainerCreation(skipRootContainerCreation); 246 rootImportTask.setRsLogger(log); 247 rootImportTask.setFactory(getFactory()); 248 rootImportTask.setThreadPolicy(getThreadPolicy()); 249 rootImportTask.setJobName(jobName); 250 rootImportTask.setBatchSize(batchSize); 251 } 252 rootImportTask.addListeners(listeners); 253 rootImportTask.addImportingDocumentFilters(importingDocumentFilters); 254 rootImportTask.setTransactionTimeout(transactionTimeout); 255 return rootImportTask; 256 } 257 258 /** 259 * Creates non-daemon threads at normal priority. 260 */ 261 public static class NamedThreadFactory implements ThreadFactory { 262 263 private final AtomicInteger threadNumber = new AtomicInteger(); 264 265 private final ThreadGroup group; 266 267 private final String prefix; 268 269 public NamedThreadFactory(String prefix) { 270 SecurityManager sm = System.getSecurityManager(); 271 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 272 this.prefix = prefix; 273 } 274 275 @Override 276 public Thread newThread(Runnable r) { 277 String name = prefix + threadNumber.incrementAndGet(); 278 Thread thread = new Thread(group, r, name); 279 // do not set daemon 280 thread.setPriority(Thread.NORM_PRIORITY); 281 return thread; 282 } 283 } 284 285 protected void doRun() throws IOException { 286 287 targetContainer = getTargetContainer(); 288 289 nbCreatedDocsByThreads.clear(); 290 291 importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS, 292 new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-")); 293 294 initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName); 295 296 rootImportTask.setRootTask(); 297 long t0 = System.currentTimeMillis(); 298 299 notifyBeforeImport(); 300 301 importTP.execute(rootImportTask); 302 sleep(200); 303 int activeTasks = importTP.getActiveCount(); 304 int oldActiveTasks = 0; 305 long lastLogProgressTime = System.currentTimeMillis(); 306 long lastCreatedDocCounter = 0; 307 308 PerfLogger perfLogger = enablePerfLogging ? new PerfLogger(PERF_HEADERS) : null; 309 while (activeTasks > 0) { 310 sleep(500); 311 activeTasks = importTP.getActiveCount(); 312 boolean logProgress = false; 313 if (oldActiveTasks != activeTasks) { 314 oldActiveTasks = activeTasks; 315 log.debug("currently " + activeTasks + " active import Threads"); 316 logProgress = true; 317 318 } 319 long ti = System.currentTimeMillis(); 320 if (ti - lastLogProgressTime > 5000) { 321 logProgress = true; 322 } 323 if (logProgress) { 324 long inbCreatedDocs = getCreatedDocsCounter(); 325 long deltaT = ti - lastLogProgressTime; 326 double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0)); 327 double imediateSpeed = averageSpeed; 328 if (deltaT > 0) { 329 imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT)); 330 } 331 log.info(inbCreatedDocs + " docs created"); 332 log.info("average speed = " + averageSpeed + " docs/s"); 333 log.info("immediate speed = " + imediateSpeed + " docs/s"); 334 335 if (enablePerfLogging) { 336 Double[] perfData = { Double.valueOf(inbCreatedDocs), averageSpeed, imediateSpeed }; 337 perfLogger.log(perfData); 338 } 339 340 lastLogProgressTime = ti; 341 lastCreatedDocCounter = inbCreatedDocs; 342 } 343 } 344 stopImportProcrocess(); 345 log.info("All Threads terminated"); 346 if (enablePerfLogging) { 347 perfLogger.release(); 348 } 349 notifyAfterImport(); 350 351 long t1 = System.currentTimeMillis(); 352 long nbCreatedDocs = getCreatedDocsCounter(); 353 log.info(nbCreatedDocs + " docs created"); 354 log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s"); 355 for (String k : nbCreatedDocsByThreads.keySet()) { 356 log.info(k + " --> " + nbCreatedDocsByThreads.get(k)); 357 } 358 Stopwatch stopwatch; 359 for (String name : SimonManager.simonNames()) { 360 if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) { 361 continue; 362 } 363 stopwatch = SimonManager.getStopwatch(name); 364 if (stopwatch.getCounter() > 0) { 365 log.info(stopwatch.toString()); 366 } 367 } 368 369 } 370 371 protected static void sleep(int millis) { 372 try { 373 Thread.sleep(millis); 374 } catch (InterruptedException e) { 375 Thread.currentThread().interrupt(); 376 throw new NuxeoException(e); 377 } 378 } 379 380 protected DocumentModel getTargetContainer() { 381 if (targetContainer == null) { 382 targetContainer = createTargetContainer(); 383 } 384 return targetContainer; 385 } 386 387 /** 388 * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be 389 * overridden in subclasses. 390 */ 391 protected DocumentModel createTargetContainer() { 392 try { 393 return session.getDocument(new PathRef(importWritePath)); 394 } catch (DocumentNotFoundException e) { 395 log.error(e.getMessage()); 396 throw e; 397 } 398 } 399 400 public ImporterThreadingPolicy getThreadPolicy() { 401 if (threadPolicy == null) { 402 threadPolicy = new DefaultMultiThreadingPolicy(); 403 } 404 return threadPolicy; 405 } 406 407 public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 408 this.threadPolicy = threadPolicy; 409 } 410 411 public ImporterDocumentModelFactory getFactory() { 412 if (factory == null) { 413 factory = new DefaultDocumentModelFactory(); 414 } 415 return factory; 416 } 417 418 public void setFactory(ImporterDocumentModelFactory factory) { 419 this.factory = factory; 420 } 421 422 /** 423 * @since 5.9.4 424 */ 425 public void setTransactionTimeout(int transactionTimeout) { 426 this.transactionTimeout = transactionTimeout; 427 } 428 429 public void setEnablePerfLogging(boolean enablePerfLogging) { 430 this.enablePerfLogging = enablePerfLogging; 431 } 432 433 @Override 434 public void stopImportProcrocess() { 435 if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) { 436 importTP.shutdownNow(); 437 } 438 } 439 440 protected void notifyBeforeImport() { 441 for (ImporterListener listener : listeners) { 442 listener.beforeImport(); 443 } 444 } 445 446 protected void notifyAfterImport() { 447 for (ImporterListener listener : listeners) { 448 listener.afterImport(); 449 } 450 } 451 452 /** 453 * @since 7.1 454 */ 455 public String getRepositoryName() { 456 return repositoryName; 457 } 458 459 /** 460 * @since 7.1 461 */ 462 public void setRepositoryName(String repositoryName) { 463 this.repositoryName = repositoryName; 464 } 465 466}