001/* 002 * (C) Copyright 2006-2008 Nuxeo SAS (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Nuxeo - initial API and implementation 016 * 017 * $Id$ 018 */ 019 020package org.nuxeo.ecm.platform.importer.base; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.List; 027import java.util.concurrent.RejectedExecutionException; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.javasimon.SimonManager; 032import org.javasimon.Split; 033import org.javasimon.Stopwatch; 034import org.nuxeo.common.utils.ExceptionUtils; 035import org.nuxeo.ecm.core.api.Blob; 036import org.nuxeo.ecm.core.api.CoreInstance; 037import org.nuxeo.ecm.core.api.CoreSession; 038import org.nuxeo.ecm.core.api.DocumentModel; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.api.blobholder.BlobHolder; 041import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 042import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 043import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 044import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 045import org.nuxeo.ecm.platform.importer.source.SourceNode; 046import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 047import org.nuxeo.runtime.transaction.TransactionHelper; 048 049/** 050 * Generic importer task 051 * 052 * @author Thierry Delprat 053 */ 054public class GenericThreadedImportTask implements Runnable { 055 056 private static final Log log = LogFactory.getLog(GenericThreadedImportTask.class); 057 058 protected static int taskCounter = 0; 059 060 protected boolean isRunning = false; 061 062 protected long uploadedFiles = 0; 063 064 protected long uploadedKO; 065 066 protected int batchSize; 067 068 protected CoreSession session; 069 070 protected DocumentModel rootDoc; 071 072 protected SourceNode rootSource; 073 074 protected Boolean skipContainerCreation = false; 075 076 protected Boolean isRootTask = false; 077 078 protected String taskId = null; 079 080 public static final int TX_TIMEOUT = 600; 081 082 protected int transactionTimeout = TX_TIMEOUT; 083 084 protected ImporterThreadingPolicy threadPolicy; 085 086 protected ImporterDocumentModelFactory factory; 087 088 protected String jobName; 089 090 protected List<ImporterListener> listeners = new ArrayList<ImporterListener>(); 091 092 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>(); 093 094 protected String repositoryName; 095 096 private static synchronized int getNextTaskId() { 097 taskCounter += 1; 098 return taskCounter; 099 } 100 101 protected ImporterLogger rsLogger = null; 102 103 protected GenericThreadedImportTask(CoreSession session) { 104 this.session = session; 105 uploadedFiles = 0; 106 taskId = "T" + getNextTaskId(); 107 } 108 109 protected GenericThreadedImportTask(CoreSession session, SourceNode rootSource, DocumentModel rootDoc, 110 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 111 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy) { 112 this.rsLogger = rsLogger; 113 this.session = session; 114 this.batchSize = batchSize; 115 uploadedFiles = 0; 116 taskId = "T" + getNextTaskId(); 117 this.rootSource = rootSource; 118 this.rootDoc = rootDoc; 119 this.skipContainerCreation = skipContainerCreation; 120 this.factory = factory; 121 this.threadPolicy = threadPolicy; 122 123 // there are documents without path, like versions 124 if (rootSource == null) { 125 throw new IllegalArgumentException("source node must be specified"); 126 } 127 } 128 129 public GenericThreadedImportTask(String repositoryName, SourceNode rootSource, DocumentModel rootDoc, 130 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 131 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy, String jobName) { 132 this(null, rootSource, rootDoc, skipContainerCreation, rsLogger, batchSize, factory, threadPolicy); 133 this.jobName = jobName; 134 this.repositoryName = repositoryName; 135 } 136 137 protected CoreSession getCoreSession() { 138 return session; 139 } 140 141 protected void commit() { 142 commit(false); 143 } 144 145 protected void commit(boolean force) { 146 uploadedFiles++; 147 if (uploadedFiles % 10 == 0) { 148 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 149 } 150 151 if (uploadedFiles % batchSize == 0 || force) { 152 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.session_save"); 153 Split split = stopwatch.start(); 154 fslog("Comiting Core Session after " + uploadedFiles + " files", true); 155 session.save(); 156 TransactionHelper.commitOrRollbackTransaction(); 157 TransactionHelper.startTransaction(transactionTimeout); 158 split.stop(); 159 } 160 } 161 162 protected DocumentModel doCreateFolderishNode(DocumentModel parent, SourceNode node) { 163 if (!shouldImportDocument(node)) { 164 return null; 165 } 166 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_folder"); 167 Split split = stopwatch.start(); 168 DocumentModel folder = null; 169 try { 170 folder = getFactory().createFolderishNode(session, parent, node); 171 } catch (IOException e) { 172 String errorMsg = "Unable to create folderish document for " + node.getSourcePath() + ":" + e 173 + (e.getCause() != null ? e.getCause() : ""); 174 fslog(errorMsg, true); 175 log.error(errorMsg); 176 // Process folderish node creation error and check if the global 177 // import task should continue 178 boolean shouldImportTaskContinue = getFactory().processFolderishNodeCreationError(session, parent, node); 179 if (!shouldImportTaskContinue) { 180 throw new NuxeoException(e); 181 } 182 } finally { 183 split.stop(); 184 } 185 if (folder != null) { 186 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 187 fslog("Created Folder " + folder.getName() + " at " + parentPath, true); 188 189 // save session if needed 190 commit(); 191 } 192 return folder; 193 194 } 195 196 protected DocumentModel doCreateLeafNode(DocumentModel parent, SourceNode node) throws IOException { 197 if (!shouldImportDocument(node)) { 198 return null; 199 } 200 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_leaf"); 201 Split split = stopwatch.start(); 202 DocumentModel leaf = null; 203 try { 204 leaf = getFactory().createLeafNode(session, parent, node); 205 } catch (IOException e) { 206 String errMsg = "Unable to create leaf document for " + node.getSourcePath() + ":" + e 207 + (e.getCause() != null ? e.getCause() : ""); 208 fslog(errMsg, true); 209 log.error(errMsg); 210 // Process leaf node creation error and check if the global 211 // import task should continue 212 boolean shouldImportTaskContinue = getFactory().processLeafNodeCreationError(session, parent, node); 213 if (!shouldImportTaskContinue) { 214 throw new NuxeoException(e); 215 } 216 } finally { 217 split.stop(); 218 } 219 BlobHolder bh = node.getBlobHolder(); 220 if (leaf != null && bh != null) { 221 Blob blob = bh.getBlob(); 222 if (blob != null) { 223 long fileSize = blob.getLength(); 224 String fileName = blob.getFilename(); 225 if (fileSize > 0) { 226 long kbSize = fileSize / 1024; 227 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 228 fslog("Created doc " + leaf.getName() + " at " + parentPath + " with file " + fileName 229 + " of size " + kbSize + "KB", true); 230 } 231 uploadedKO += fileSize; 232 } 233 234 // save session if needed 235 commit(); 236 } 237 return leaf; 238 } 239 240 protected boolean shouldImportDocument(SourceNode node) { 241 for (ImportingDocumentFilter importingDocumentFilter : importingDocumentFilters) { 242 if (!importingDocumentFilter.shouldImportDocument(node)) { 243 return false; 244 } 245 } 246 return true; 247 } 248 249 protected GenericThreadedImportTask createNewTask(DocumentModel parent, SourceNode node, ImporterLogger log, 250 Integer batchSize) { 251 GenericThreadedImportTask newTask = new GenericThreadedImportTask(repositoryName, node, parent, 252 skipContainerCreation, log, batchSize, factory, threadPolicy, null); 253 newTask.addListeners(listeners); 254 newTask.addImportingDocumentFilters(importingDocumentFilters); 255 return newTask; 256 } 257 258 protected GenericThreadedImportTask createNewTaskIfNeeded(DocumentModel parent, SourceNode node) { 259 if (isRootTask) { 260 isRootTask = false; // don't fork Root thread on first folder 261 return null; 262 } 263 int scheduledTasks = GenericMultiThreadedImporter.getExecutor().getQueue().size(); 264 boolean createTask = getThreadPolicy().needToCreateThreadAfterNewFolderishNode(parent, node, uploadedFiles, 265 batchSize, scheduledTasks); 266 267 if (createTask) { 268 GenericThreadedImportTask newTask = createNewTask(parent, node, rsLogger, batchSize); 269 newTask.setBatchSize(getBatchSize()); 270 newTask.setSkipContainerCreation(true); 271 newTask.setTransactionTimeout(transactionTimeout); 272 return newTask; 273 } else { 274 return null; 275 } 276 } 277 278 protected void recursiveCreateDocumentFromNode(DocumentModel parent, SourceNode node) throws IOException { 279 280 if (getFactory().isTargetDocumentModelFolderish(node)) { 281 DocumentModel folder; 282 Boolean newThread = false; 283 if (skipContainerCreation) { 284 folder = parent; 285 skipContainerCreation = false; 286 newThread = true; 287 } else { 288 folder = doCreateFolderishNode(parent, node); 289 if (folder == null) { 290 return; 291 } 292 } 293 294 // get a new TaskImporter if available to start 295 // processing the sub-tree 296 GenericThreadedImportTask task = null; 297 if (!newThread) { 298 task = createNewTaskIfNeeded(folder, node); 299 } 300 if (task != null) { 301 // force comit before starting new thread 302 commit(true); 303 try { 304 GenericMultiThreadedImporter.getExecutor().execute(task); 305 } catch (RejectedExecutionException e) { 306 log.error("Import task rejected", e); 307 } 308 309 } else { 310 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.node_get_children"); 311 Split split = stopwatch.start(); 312 List<SourceNode> nodes = node.getChildren(); 313 split.stop(); 314 if (nodes != null) { 315 for (SourceNode child : nodes) { 316 recursiveCreateDocumentFromNode(folder, child); 317 } 318 } 319 } 320 } else { 321 doCreateLeafNode(parent, node); 322 } 323 } 324 325 public void setInputSource(SourceNode node) { 326 this.rootSource = node; 327 } 328 329 public void setTargetFolder(DocumentModel rootDoc) { 330 this.rootDoc = rootDoc; 331 } 332 333 // TODO isRunning is not yet handled correctly 334 public boolean isRunning() { 335 synchronized (this) { 336 return isRunning; 337 } 338 } 339 340 public synchronized void run() { 341 TransactionHelper.startTransaction(transactionTimeout); 342 synchronized (this) { 343 if (isRunning) { 344 throw new IllegalStateException("Task already running"); 345 } 346 isRunning = true; 347 // versions have no path, target document can be null 348 if (rootSource == null) { 349 isRunning = false; 350 throw new IllegalArgumentException("source node must be specified"); 351 } 352 } 353 try { 354 session = CoreInstance.openCoreSessionSystem(repositoryName); 355 log.info("Starting new import task"); 356 if (rootDoc != null) { 357 // reopen the root to be sure the session is valid 358 rootDoc = session.getDocument(rootDoc.getRef()); 359 } 360 recursiveCreateDocumentFromNode(rootDoc, rootSource); 361 session.save(); 362 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 363 TransactionHelper.commitOrRollbackTransaction(); 364 } catch (Exception e) { // deals with interrupt below 365 log.error("Error during import", e); 366 ExceptionUtils.checkInterrupt(e); 367 notifyImportError(); 368 } finally { 369 log.info("End of task"); 370 if (session != null) { 371 session.close(); 372 session = null; 373 } 374 synchronized (this) { 375 isRunning = false; 376 } 377 } 378 } 379 380 // This should be done with log4j but I did not find a way to configure it 381 // the way I wanted ... 382 protected void fslog(String msg, boolean debug) { 383 if (debug) { 384 rsLogger.debug(msg); 385 } else { 386 rsLogger.info(msg); 387 } 388 } 389 390 public int getBatchSize() { 391 return batchSize; 392 } 393 394 public void setBatchSize(int batchSize) { 395 this.batchSize = batchSize; 396 } 397 398 public void setSkipContainerCreation(Boolean skipContainerCreation) { 399 this.skipContainerCreation = skipContainerCreation; 400 } 401 402 public void setRootTask() { 403 isRootTask = true; 404 taskCounter = 0; 405 taskId = "T0"; 406 } 407 408 protected ImporterThreadingPolicy getThreadPolicy() { 409 return threadPolicy; 410 } 411 412 protected ImporterDocumentModelFactory getFactory() { 413 return factory; 414 } 415 416 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 417 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 418 } 419 420 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 421 this.importingDocumentFilters.addAll(importingDocumentFilters); 422 } 423 424 public void addListeners(ImporterListener... listeners) { 425 addListeners(Arrays.asList(listeners)); 426 } 427 428 public void addListeners(Collection<ImporterListener> listeners) { 429 this.listeners.addAll(listeners); 430 } 431 432 public void setTransactionTimeout(int transactionTimeout) { 433 this.transactionTimeout = transactionTimeout < 1 ? TX_TIMEOUT : transactionTimeout; 434 } 435 436 protected void notifyImportError() { 437 for (ImporterListener listener : listeners) { 438 listener.importError(); 439 } 440 } 441 442 protected void setRootDoc(DocumentModel rootDoc) { 443 this.rootDoc = rootDoc; 444 } 445 446 protected void setRootSource(SourceNode rootSource) { 447 this.rootSource = rootSource; 448 } 449 450 protected void setFactory(ImporterDocumentModelFactory factory) { 451 this.factory = factory; 452 } 453 454 protected void setRsLogger(ImporterLogger rsLogger) { 455 this.rsLogger = rsLogger; 456 } 457 458 protected void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 459 this.threadPolicy = threadPolicy; 460 } 461 462 protected void setJobName(String jobName) { 463 this.jobName = jobName; 464 } 465 466}