001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 * 019 * $Id$ 020 */ 021 022package org.nuxeo.ecm.platform.importer.base; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.RejectedExecutionException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.javasimon.SimonManager; 034import org.javasimon.Split; 035import org.javasimon.Stopwatch; 036import org.nuxeo.common.utils.ExceptionUtils; 037import org.nuxeo.ecm.core.api.Blob; 038import org.nuxeo.ecm.core.api.CloseableCoreSession; 039import org.nuxeo.ecm.core.api.CoreInstance; 040import org.nuxeo.ecm.core.api.CoreSession; 041import org.nuxeo.ecm.core.api.DocumentModel; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.api.blobholder.BlobHolder; 044import org.nuxeo.ecm.core.event.Event; 045import org.nuxeo.ecm.core.event.EventProducer; 046import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 047import org.nuxeo.ecm.core.event.impl.EventContextImpl; 048import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 049import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 050import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 051import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 052import org.nuxeo.ecm.platform.importer.source.SourceNode; 053import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 054import org.nuxeo.runtime.api.Framework; 055import org.nuxeo.runtime.transaction.TransactionHelper; 056 057/** 058 * Generic importer task 059 * 060 * @author Thierry Delprat 061 */ 062public class GenericThreadedImportTask implements Runnable { 063 064 public static final String DOC_IMPORTED_EVENT = "documentImportedWithPlatformImporter"; 065 066 private static final Log log = LogFactory.getLog(GenericThreadedImportTask.class); 067 068 protected static int taskCounter = 0; 069 070 protected boolean isRunning = false; 071 072 protected long uploadedFiles = 0; 073 074 protected long uploadedKO; 075 076 protected int batchSize; 077 078 protected CoreSession session; 079 080 protected DocumentModel rootDoc; 081 082 protected SourceNode rootSource; 083 084 protected Boolean skipContainerCreation = false; 085 086 protected Boolean isRootTask = false; 087 088 protected String taskId = null; 089 090 public static final int TX_TIMEOUT = 600; 091 092 protected int transactionTimeout = TX_TIMEOUT; 093 094 protected ImporterThreadingPolicy threadPolicy; 095 096 protected ImporterDocumentModelFactory factory; 097 098 protected String jobName; 099 100 protected List<ImporterListener> listeners = new ArrayList<ImporterListener>(); 101 102 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>(); 103 104 protected String repositoryName; 105 106 private static synchronized int getNextTaskId() { 107 taskCounter += 1; 108 return taskCounter; 109 } 110 111 protected ImporterLogger rsLogger = null; 112 113 protected GenericThreadedImportTask(CoreSession session) { 114 this.session = session; 115 uploadedFiles = 0; 116 taskId = "T" + getNextTaskId(); 117 } 118 119 protected GenericThreadedImportTask(CoreSession session, SourceNode rootSource, DocumentModel rootDoc, 120 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 121 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy) { 122 this.rsLogger = rsLogger; 123 this.session = session; 124 this.batchSize = batchSize; 125 uploadedFiles = 0; 126 taskId = "T" + getNextTaskId(); 127 this.rootSource = rootSource; 128 this.rootDoc = rootDoc; 129 this.skipContainerCreation = skipContainerCreation; 130 this.factory = factory; 131 this.threadPolicy = threadPolicy; 132 133 // there are documents without path, like versions 134 if (rootSource == null) { 135 throw new IllegalArgumentException("source node must be specified"); 136 } 137 } 138 139 public GenericThreadedImportTask(String repositoryName, SourceNode rootSource, DocumentModel rootDoc, 140 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 141 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy, String jobName) { 142 this(null, rootSource, rootDoc, skipContainerCreation, rsLogger, batchSize, factory, threadPolicy); 143 this.jobName = jobName; 144 this.repositoryName = repositoryName; 145 } 146 147 protected CoreSession getCoreSession() { 148 return session; 149 } 150 151 protected void commit() { 152 commit(false); 153 } 154 155 protected void commit(boolean force) { 156 uploadedFiles++; 157 if (uploadedFiles % 10 == 0) { 158 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 159 } 160 161 if (uploadedFiles % batchSize == 0 || force) { 162 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.session_save"); 163 Split split = stopwatch.start(); 164 fslog("Committing Core Session after " + uploadedFiles + " files", true); 165 session.save(); 166 TransactionHelper.commitOrRollbackTransaction(); 167 TransactionHelper.startTransaction(transactionTimeout); 168 split.stop(); 169 } 170 } 171 172 protected DocumentModel doCreateFolderishNode(DocumentModel parent, SourceNode node) { 173 if (!shouldImportDocument(node)) { 174 return null; 175 } 176 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_folder"); 177 Split split = stopwatch.start(); 178 DocumentModel folder = null; 179 try { 180 folder = getFactory().createFolderishNode(session, parent, node); 181 } catch (IOException e) { 182 String errorMsg = "Unable to create folderish document for " + node.getSourcePath() + ":" + e 183 + (e.getCause() != null ? e.getCause() : ""); 184 fslog(errorMsg, true); 185 log.error(errorMsg); 186 // Process folderish node creation error and check if the global 187 // import task should continue 188 boolean shouldImportTaskContinue = getFactory().processFolderishNodeCreationError(session, parent, node); 189 if (!shouldImportTaskContinue) { 190 throw new NuxeoException(e); 191 } 192 } finally { 193 split.stop(); 194 } 195 if (folder != null) { 196 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 197 fslog("Created Folder " + folder.getName() + " at " + parentPath, true); 198 199 // save session if needed 200 commit(); 201 } 202 return folder; 203 204 } 205 206 protected DocumentModel doCreateLeafNode(DocumentModel parent, SourceNode node) throws IOException { 207 if (!shouldImportDocument(node)) { 208 return null; 209 } 210 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_leaf"); 211 Split split = stopwatch.start(); 212 DocumentModel leaf = null; 213 try { 214 leaf = getFactory().createLeafNode(session, parent, node); 215 } catch (IOException e) { 216 String errMsg = "Unable to create leaf document for " + node.getSourcePath() + ":" + e 217 + (e.getCause() != null ? e.getCause() : ""); 218 fslog(errMsg, true); 219 log.error(errMsg); 220 // Process leaf node creation error and check if the global 221 // import task should continue 222 boolean shouldImportTaskContinue = getFactory().processLeafNodeCreationError(session, parent, node); 223 if (!shouldImportTaskContinue) { 224 throw new NuxeoException(e); 225 } 226 } finally { 227 split.stop(); 228 } 229 BlobHolder bh = node.getBlobHolder(); 230 if (leaf != null && bh != null) { 231 Blob blob = bh.getBlob(); 232 if (blob != null) { 233 long fileSize = blob.getLength(); 234 String fileName = blob.getFilename(); 235 if (fileSize > 0) { 236 long kbSize = fileSize / 1024; 237 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 238 fslog("Created doc " + leaf.getName() + " at " + parentPath + " with file " + fileName 239 + " of size " + kbSize + "KB", true); 240 } 241 uploadedKO += fileSize; 242 } 243 244 // send an event about the imported document 245 EventProducer eventProducer = Framework.getService(EventProducer.class); 246 EventContextImpl eventContext = new DocumentEventContext(session, session.getPrincipal(), leaf); 247 Event event = eventContext.newEvent(DOC_IMPORTED_EVENT); 248 eventProducer.fireEvent(event); 249 250 // save session if needed 251 commit(); 252 } 253 return leaf; 254 } 255 256 protected boolean shouldImportDocument(SourceNode node) { 257 for (ImportingDocumentFilter importingDocumentFilter : importingDocumentFilters) { 258 if (!importingDocumentFilter.shouldImportDocument(node)) { 259 return false; 260 } 261 } 262 return true; 263 } 264 265 protected GenericThreadedImportTask createNewTask(DocumentModel parent, SourceNode node, ImporterLogger log, 266 Integer batchSize) { 267 GenericThreadedImportTask newTask = new GenericThreadedImportTask(repositoryName, node, parent, 268 skipContainerCreation, log, batchSize, factory, threadPolicy, null); 269 newTask.addListeners(listeners); 270 newTask.addImportingDocumentFilters(importingDocumentFilters); 271 return newTask; 272 } 273 274 protected GenericThreadedImportTask createNewTaskIfNeeded(DocumentModel parent, SourceNode node) { 275 if (isRootTask) { 276 isRootTask = false; // don't fork Root thread on first folder 277 return null; 278 } 279 int scheduledTasks = GenericMultiThreadedImporter.getExecutor().getQueue().size(); 280 boolean createTask = getThreadPolicy().needToCreateThreadAfterNewFolderishNode(parent, node, uploadedFiles, 281 batchSize, scheduledTasks); 282 283 if (createTask) { 284 GenericThreadedImportTask newTask = createNewTask(parent, node, rsLogger, batchSize); 285 newTask.setBatchSize(getBatchSize()); 286 newTask.setSkipContainerCreation(true); 287 newTask.setTransactionTimeout(transactionTimeout); 288 return newTask; 289 } else { 290 return null; 291 } 292 } 293 294 protected void recursiveCreateDocumentFromNode(DocumentModel parent, SourceNode node) throws IOException { 295 296 if (getFactory().isTargetDocumentModelFolderish(node)) { 297 DocumentModel folder; 298 Boolean newThread = false; 299 if (skipContainerCreation) { 300 folder = parent; 301 skipContainerCreation = false; 302 newThread = true; 303 } else { 304 folder = doCreateFolderishNode(parent, node); 305 if (folder == null) { 306 return; 307 } 308 } 309 310 // get a new TaskImporter if available to start 311 // processing the sub-tree 312 GenericThreadedImportTask task = null; 313 if (!newThread) { 314 task = createNewTaskIfNeeded(folder, node); 315 } 316 if (task != null) { 317 // force comit before starting new thread 318 commit(true); 319 try { 320 GenericMultiThreadedImporter.getExecutor().execute(task); 321 } catch (RejectedExecutionException e) { 322 log.error("Import task rejected", e); 323 } 324 325 } else { 326 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.node_get_children"); 327 Split split = stopwatch.start(); 328 List<SourceNode> nodes = node.getChildren(); 329 split.stop(); 330 if (nodes != null) { 331 for (SourceNode child : nodes) { 332 recursiveCreateDocumentFromNode(folder, child); 333 } 334 } 335 } 336 } else { 337 doCreateLeafNode(parent, node); 338 } 339 } 340 341 public void setInputSource(SourceNode node) { 342 this.rootSource = node; 343 } 344 345 public void setTargetFolder(DocumentModel rootDoc) { 346 this.rootDoc = rootDoc; 347 } 348 349 // TODO isRunning is not yet handled correctly 350 public boolean isRunning() { 351 synchronized (this) { 352 return isRunning; 353 } 354 } 355 356 @Override 357 public synchronized void run() { 358 synchronized (this) { 359 if (isRunning) { 360 throw new IllegalStateException("Task already running"); 361 } 362 isRunning = true; 363 // versions have no path, target document can be null 364 if (rootSource == null) { 365 isRunning = false; 366 throw new IllegalArgumentException("source node must be specified"); 367 } 368 } 369 TransactionHelper.startTransaction(transactionTimeout); 370 boolean completedAbruptly = true; 371 try (CloseableCoreSession closeableCoreSession = CoreInstance.openCoreSessionSystem(repositoryName)) { 372 session = closeableCoreSession; 373 log.info("Starting new import task"); 374 Framework.doPrivileged(() -> { 375 if (rootDoc != null) { 376 // reopen the root to be sure the session is valid 377 rootDoc = session.getDocument(rootDoc.getRef()); 378 } 379 try { 380 recursiveCreateDocumentFromNode(rootDoc, rootSource); 381 } catch (IOException e) { 382 throw new NuxeoException(e); 383 } 384 session.save(); 385 }); 386 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 387 completedAbruptly = false; 388 } catch (Exception e) { // deals with interrupt below 389 log.error("Error during import", e); 390 ExceptionUtils.checkInterrupt(e); 391 notifyImportError(); 392 } finally { 393 log.info("End of task"); 394 session = null; 395 if (completedAbruptly) { 396 TransactionHelper.setTransactionRollbackOnly(); 397 } 398 TransactionHelper.commitOrRollbackTransaction(); 399 synchronized (this) { 400 isRunning = false; 401 } 402 } 403 } 404 405 // This should be done with log4j but I did not find a way to configure it 406 // the way I wanted ... 407 protected void fslog(String msg, boolean debug) { 408 if (debug) { 409 rsLogger.debug(msg); 410 } else { 411 rsLogger.info(msg); 412 } 413 } 414 415 public int getBatchSize() { 416 return batchSize; 417 } 418 419 public void setBatchSize(int batchSize) { 420 this.batchSize = batchSize; 421 } 422 423 public void setSkipContainerCreation(Boolean skipContainerCreation) { 424 this.skipContainerCreation = skipContainerCreation; 425 } 426 427 public void setRootTask() { 428 isRootTask = true; 429 taskCounter = 0; 430 taskId = "T0"; 431 } 432 433 protected ImporterThreadingPolicy getThreadPolicy() { 434 return threadPolicy; 435 } 436 437 protected ImporterDocumentModelFactory getFactory() { 438 return factory; 439 } 440 441 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 442 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 443 } 444 445 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 446 this.importingDocumentFilters.addAll(importingDocumentFilters); 447 } 448 449 public void addListeners(ImporterListener... listeners) { 450 addListeners(Arrays.asList(listeners)); 451 } 452 453 public void addListeners(Collection<ImporterListener> listeners) { 454 this.listeners.addAll(listeners); 455 } 456 457 public void setTransactionTimeout(int transactionTimeout) { 458 this.transactionTimeout = transactionTimeout < 1 ? TX_TIMEOUT : transactionTimeout; 459 } 460 461 protected void notifyImportError() { 462 for (ImporterListener listener : listeners) { 463 listener.importError(); 464 } 465 } 466 467 protected void setRootDoc(DocumentModel rootDoc) { 468 this.rootDoc = rootDoc; 469 } 470 471 protected void setRootSource(SourceNode rootSource) { 472 this.rootSource = rootSource; 473 } 474 475 protected void setFactory(ImporterDocumentModelFactory factory) { 476 this.factory = factory; 477 } 478 479 protected void setRsLogger(ImporterLogger rsLogger) { 480 this.rsLogger = rsLogger; 481 } 482 483 protected void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 484 this.threadPolicy = threadPolicy; 485 } 486 487 protected void setJobName(String jobName) { 488 this.jobName = jobName; 489 } 490 491}