001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 * 019 * $Id$ 020 */ 021 022package org.nuxeo.ecm.platform.importer.base; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.RejectedExecutionException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.javasimon.SimonManager; 034import org.javasimon.Split; 035import org.javasimon.Stopwatch; 036import org.nuxeo.common.utils.ExceptionUtils; 037import org.nuxeo.ecm.core.api.Blob; 038import org.nuxeo.ecm.core.api.CoreInstance; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.api.blobholder.BlobHolder; 043import org.nuxeo.ecm.core.event.Event; 044import org.nuxeo.ecm.core.event.EventProducer; 045import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 046import org.nuxeo.ecm.core.event.impl.EventContextImpl; 047import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 048import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 049import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 050import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 051import org.nuxeo.ecm.platform.importer.source.SourceNode; 052import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 053import org.nuxeo.runtime.api.Framework; 054import org.nuxeo.runtime.transaction.TransactionHelper; 055 056/** 057 * Generic importer task 058 * 059 * @author Thierry Delprat 060 */ 061public class GenericThreadedImportTask implements Runnable { 062 063 public static final String DOC_IMPORTED_EVENT = "documentImportedWithPlatformImporter"; 064 065 private static final Log log = LogFactory.getLog(GenericThreadedImportTask.class); 066 067 protected static int taskCounter = 0; 068 069 protected boolean isRunning = false; 070 071 protected long uploadedFiles = 0; 072 073 protected long uploadedKO; 074 075 protected int batchSize; 076 077 protected CoreSession session; 078 079 protected DocumentModel rootDoc; 080 081 protected SourceNode rootSource; 082 083 protected Boolean skipContainerCreation = false; 084 085 protected Boolean isRootTask = false; 086 087 protected String taskId = null; 088 089 public static final int TX_TIMEOUT = 600; 090 091 protected int transactionTimeout = TX_TIMEOUT; 092 093 protected ImporterThreadingPolicy threadPolicy; 094 095 protected ImporterDocumentModelFactory factory; 096 097 protected String jobName; 098 099 protected List<ImporterListener> listeners = new ArrayList<ImporterListener>(); 100 101 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>(); 102 103 protected String repositoryName; 104 105 private static synchronized int getNextTaskId() { 106 taskCounter += 1; 107 return taskCounter; 108 } 109 110 protected ImporterLogger rsLogger = null; 111 112 protected GenericThreadedImportTask(CoreSession session) { 113 this.session = session; 114 uploadedFiles = 0; 115 taskId = "T" + getNextTaskId(); 116 } 117 118 protected GenericThreadedImportTask(CoreSession session, SourceNode rootSource, DocumentModel rootDoc, 119 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 120 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy) { 121 this.rsLogger = rsLogger; 122 this.session = session; 123 this.batchSize = batchSize; 124 uploadedFiles = 0; 125 taskId = "T" + getNextTaskId(); 126 this.rootSource = rootSource; 127 this.rootDoc = rootDoc; 128 this.skipContainerCreation = skipContainerCreation; 129 this.factory = factory; 130 this.threadPolicy = threadPolicy; 131 132 // there are documents without path, like versions 133 if (rootSource == null) { 134 throw new IllegalArgumentException("source node must be specified"); 135 } 136 } 137 138 public GenericThreadedImportTask(String repositoryName, SourceNode rootSource, DocumentModel rootDoc, 139 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 140 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy, String jobName) { 141 this(null, rootSource, rootDoc, skipContainerCreation, rsLogger, batchSize, factory, threadPolicy); 142 this.jobName = jobName; 143 this.repositoryName = repositoryName; 144 } 145 146 protected CoreSession getCoreSession() { 147 return session; 148 } 149 150 protected void commit() { 151 commit(false); 152 } 153 154 protected void commit(boolean force) { 155 uploadedFiles++; 156 if (uploadedFiles % 10 == 0) { 157 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 158 } 159 160 if (uploadedFiles % batchSize == 0 || force) { 161 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.session_save"); 162 Split split = stopwatch.start(); 163 fslog("Committing Core Session after " + uploadedFiles + " files", true); 164 session.save(); 165 TransactionHelper.commitOrRollbackTransaction(); 166 TransactionHelper.startTransaction(transactionTimeout); 167 split.stop(); 168 } 169 } 170 171 protected DocumentModel doCreateFolderishNode(DocumentModel parent, SourceNode node) { 172 if (!shouldImportDocument(node)) { 173 return null; 174 } 175 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_folder"); 176 Split split = stopwatch.start(); 177 DocumentModel folder = null; 178 try { 179 folder = getFactory().createFolderishNode(session, parent, node); 180 } catch (IOException e) { 181 String errorMsg = "Unable to create folderish document for " + node.getSourcePath() + ":" + e 182 + (e.getCause() != null ? e.getCause() : ""); 183 fslog(errorMsg, true); 184 log.error(errorMsg); 185 // Process folderish node creation error and check if the global 186 // import task should continue 187 boolean shouldImportTaskContinue = getFactory().processFolderishNodeCreationError(session, parent, node); 188 if (!shouldImportTaskContinue) { 189 throw new NuxeoException(e); 190 } 191 } finally { 192 split.stop(); 193 } 194 if (folder != null) { 195 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 196 fslog("Created Folder " + folder.getName() + " at " + parentPath, true); 197 198 // save session if needed 199 commit(); 200 } 201 return folder; 202 203 } 204 205 protected DocumentModel doCreateLeafNode(DocumentModel parent, SourceNode node) throws IOException { 206 if (!shouldImportDocument(node)) { 207 return null; 208 } 209 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_leaf"); 210 Split split = stopwatch.start(); 211 DocumentModel leaf = null; 212 try { 213 leaf = getFactory().createLeafNode(session, parent, node); 214 } catch (IOException e) { 215 String errMsg = "Unable to create leaf document for " + node.getSourcePath() + ":" + e 216 + (e.getCause() != null ? e.getCause() : ""); 217 fslog(errMsg, true); 218 log.error(errMsg); 219 // Process leaf node creation error and check if the global 220 // import task should continue 221 boolean shouldImportTaskContinue = getFactory().processLeafNodeCreationError(session, parent, node); 222 if (!shouldImportTaskContinue) { 223 throw new NuxeoException(e); 224 } 225 } finally { 226 split.stop(); 227 } 228 BlobHolder bh = node.getBlobHolder(); 229 if (leaf != null && bh != null) { 230 Blob blob = bh.getBlob(); 231 if (blob != null) { 232 long fileSize = blob.getLength(); 233 String fileName = blob.getFilename(); 234 if (fileSize > 0) { 235 long kbSize = fileSize / 1024; 236 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 237 fslog("Created doc " + leaf.getName() + " at " + parentPath + " with file " + fileName 238 + " of size " + kbSize + "KB", true); 239 } 240 uploadedKO += fileSize; 241 } 242 243 // send an event about the imported document 244 EventProducer eventProducer = Framework.getService(EventProducer.class); 245 EventContextImpl eventContext = new DocumentEventContext(session, session.getPrincipal(), leaf); 246 Event event = eventContext.newEvent(DOC_IMPORTED_EVENT); 247 eventProducer.fireEvent(event); 248 249 // save session if needed 250 commit(); 251 } 252 return leaf; 253 } 254 255 protected boolean shouldImportDocument(SourceNode node) { 256 for (ImportingDocumentFilter importingDocumentFilter : importingDocumentFilters) { 257 if (!importingDocumentFilter.shouldImportDocument(node)) { 258 return false; 259 } 260 } 261 return true; 262 } 263 264 protected GenericThreadedImportTask createNewTask(DocumentModel parent, SourceNode node, ImporterLogger log, 265 Integer batchSize) { 266 GenericThreadedImportTask newTask = new GenericThreadedImportTask(repositoryName, node, parent, 267 skipContainerCreation, log, batchSize, factory, threadPolicy, null); 268 newTask.addListeners(listeners); 269 newTask.addImportingDocumentFilters(importingDocumentFilters); 270 return newTask; 271 } 272 273 protected GenericThreadedImportTask createNewTaskIfNeeded(DocumentModel parent, SourceNode node) { 274 if (isRootTask) { 275 isRootTask = false; // don't fork Root thread on first folder 276 return null; 277 } 278 int scheduledTasks = GenericMultiThreadedImporter.getExecutor().getQueue().size(); 279 boolean createTask = getThreadPolicy().needToCreateThreadAfterNewFolderishNode(parent, node, uploadedFiles, 280 batchSize, scheduledTasks); 281 282 if (createTask) { 283 GenericThreadedImportTask newTask = createNewTask(parent, node, rsLogger, batchSize); 284 newTask.setBatchSize(getBatchSize()); 285 newTask.setSkipContainerCreation(true); 286 newTask.setTransactionTimeout(transactionTimeout); 287 return newTask; 288 } else { 289 return null; 290 } 291 } 292 293 protected void recursiveCreateDocumentFromNode(DocumentModel parent, SourceNode node) throws IOException { 294 295 if (getFactory().isTargetDocumentModelFolderish(node)) { 296 DocumentModel folder; 297 Boolean newThread = false; 298 if (skipContainerCreation) { 299 folder = parent; 300 skipContainerCreation = false; 301 newThread = true; 302 } else { 303 folder = doCreateFolderishNode(parent, node); 304 if (folder == null) { 305 return; 306 } 307 } 308 309 // get a new TaskImporter if available to start 310 // processing the sub-tree 311 GenericThreadedImportTask task = null; 312 if (!newThread) { 313 task = createNewTaskIfNeeded(folder, node); 314 } 315 if (task != null) { 316 // force comit before starting new thread 317 commit(true); 318 try { 319 GenericMultiThreadedImporter.getExecutor().execute(task); 320 } catch (RejectedExecutionException e) { 321 log.error("Import task rejected", e); 322 } 323 324 } else { 325 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.node_get_children"); 326 Split split = stopwatch.start(); 327 List<SourceNode> nodes = node.getChildren(); 328 split.stop(); 329 if (nodes != null) { 330 for (SourceNode child : nodes) { 331 recursiveCreateDocumentFromNode(folder, child); 332 } 333 } 334 } 335 } else { 336 doCreateLeafNode(parent, node); 337 } 338 } 339 340 public void setInputSource(SourceNode node) { 341 this.rootSource = node; 342 } 343 344 public void setTargetFolder(DocumentModel rootDoc) { 345 this.rootDoc = rootDoc; 346 } 347 348 // TODO isRunning is not yet handled correctly 349 public boolean isRunning() { 350 synchronized (this) { 351 return isRunning; 352 } 353 } 354 355 @Override 356 public synchronized void run() { 357 synchronized (this) { 358 if (isRunning) { 359 throw new IllegalStateException("Task already running"); 360 } 361 isRunning = true; 362 // versions have no path, target document can be null 363 if (rootSource == null) { 364 isRunning = false; 365 throw new IllegalArgumentException("source node must be specified"); 366 } 367 } 368 TransactionHelper.startTransaction(transactionTimeout); 369 boolean completedAbruptly = true; 370 try { 371 session = CoreInstance.openCoreSessionSystem(repositoryName); 372 log.info("Starting new import task"); 373 if (rootDoc != null) { 374 // reopen the root to be sure the session is valid 375 rootDoc = session.getDocument(rootDoc.getRef()); 376 } 377 recursiveCreateDocumentFromNode(rootDoc, rootSource); 378 session.save(); 379 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 380 completedAbruptly = false; 381 } catch (Exception e) { // deals with interrupt below 382 log.error("Error during import", e); 383 ExceptionUtils.checkInterrupt(e); 384 notifyImportError(); 385 } finally { 386 log.info("End of task"); 387 try { 388 if (session != null) { 389 session.close(); 390 session = null; 391 } 392 } finally { 393 if (completedAbruptly) { 394 TransactionHelper.setTransactionRollbackOnly(); 395 } 396 TransactionHelper.commitOrRollbackTransaction(); 397 synchronized (this) { 398 isRunning = false; 399 } 400 } 401 } 402 } 403 404 // This should be done with log4j but I did not find a way to configure it 405 // the way I wanted ... 406 protected void fslog(String msg, boolean debug) { 407 if (debug) { 408 rsLogger.debug(msg); 409 } else { 410 rsLogger.info(msg); 411 } 412 } 413 414 public int getBatchSize() { 415 return batchSize; 416 } 417 418 public void setBatchSize(int batchSize) { 419 this.batchSize = batchSize; 420 } 421 422 public void setSkipContainerCreation(Boolean skipContainerCreation) { 423 this.skipContainerCreation = skipContainerCreation; 424 } 425 426 public void setRootTask() { 427 isRootTask = true; 428 taskCounter = 0; 429 taskId = "T0"; 430 } 431 432 protected ImporterThreadingPolicy getThreadPolicy() { 433 return threadPolicy; 434 } 435 436 protected ImporterDocumentModelFactory getFactory() { 437 return factory; 438 } 439 440 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 441 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 442 } 443 444 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 445 this.importingDocumentFilters.addAll(importingDocumentFilters); 446 } 447 448 public void addListeners(ImporterListener... listeners) { 449 addListeners(Arrays.asList(listeners)); 450 } 451 452 public void addListeners(Collection<ImporterListener> listeners) { 453 this.listeners.addAll(listeners); 454 } 455 456 public void setTransactionTimeout(int transactionTimeout) { 457 this.transactionTimeout = transactionTimeout < 1 ? TX_TIMEOUT : transactionTimeout; 458 } 459 460 protected void notifyImportError() { 461 for (ImporterListener listener : listeners) { 462 listener.importError(); 463 } 464 } 465 466 protected void setRootDoc(DocumentModel rootDoc) { 467 this.rootDoc = rootDoc; 468 } 469 470 protected void setRootSource(SourceNode rootSource) { 471 this.rootSource = rootSource; 472 } 473 474 protected void setFactory(ImporterDocumentModelFactory factory) { 475 this.factory = factory; 476 } 477 478 protected void setRsLogger(ImporterLogger rsLogger) { 479 this.rsLogger = rsLogger; 480 } 481 482 protected void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 483 this.threadPolicy = threadPolicy; 484 } 485 486 protected void setJobName(String jobName) { 487 this.jobName = jobName; 488 } 489 490}