001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 * 019 * $Id$ 020 */ 021 022package org.nuxeo.ecm.platform.importer.base; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.RejectedExecutionException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.javasimon.SimonManager; 034import org.javasimon.Split; 035import org.javasimon.Stopwatch; 036import org.nuxeo.common.utils.ExceptionUtils; 037import org.nuxeo.ecm.core.api.Blob; 038import org.nuxeo.ecm.core.api.CoreInstance; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.api.blobholder.BlobHolder; 043import org.nuxeo.ecm.core.event.Event; 044import org.nuxeo.ecm.core.event.EventProducer; 045import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 046import org.nuxeo.ecm.core.event.impl.EventContextImpl; 047import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 048import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 049import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 050import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 051import org.nuxeo.ecm.platform.importer.source.SourceNode; 052import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 053import org.nuxeo.runtime.api.Framework; 054import org.nuxeo.runtime.transaction.TransactionHelper; 055 056/** 057 * Generic importer task 058 * 059 * @author Thierry Delprat 060 */ 061public class GenericThreadedImportTask implements Runnable { 062 063 public static final String DOC_IMPORTED_EVENT = "documentImportedWithPlatformImporter"; 064 065 private static final Log log = LogFactory.getLog(GenericThreadedImportTask.class); 066 067 protected static int taskCounter = 0; 068 069 protected boolean isRunning = false; 070 071 protected long uploadedFiles = 0; 072 073 protected long uploadedKO; 074 075 protected int batchSize; 076 077 protected CoreSession session; 078 079 protected DocumentModel rootDoc; 080 081 protected SourceNode rootSource; 082 083 protected Boolean skipContainerCreation = false; 084 085 protected Boolean isRootTask = false; 086 087 protected String taskId = null; 088 089 public static final int TX_TIMEOUT = 600; 090 091 protected int transactionTimeout = TX_TIMEOUT; 092 093 protected ImporterThreadingPolicy threadPolicy; 094 095 protected ImporterDocumentModelFactory factory; 096 097 protected String jobName; 098 099 protected List<ImporterListener> listeners = new ArrayList<>(); 100 101 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<>(); 102 103 protected String repositoryName; 104 105 private static synchronized int getNextTaskId() { 106 taskCounter += 1; 107 return taskCounter; 108 } 109 110 protected ImporterLogger rsLogger = null; 111 112 protected GenericThreadedImportTask(CoreSession session) { 113 this.session = session; 114 uploadedFiles = 0; 115 taskId = "T" + getNextTaskId(); 116 } 117 118 protected GenericThreadedImportTask(CoreSession session, SourceNode rootSource, DocumentModel rootDoc, 119 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 120 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy) { 121 this.rsLogger = rsLogger; 122 this.session = session; 123 this.batchSize = batchSize; 124 uploadedFiles = 0; 125 taskId = "T" + getNextTaskId(); 126 this.rootSource = rootSource; 127 this.rootDoc = rootDoc; 128 this.skipContainerCreation = skipContainerCreation; 129 this.factory = factory; 130 this.threadPolicy = threadPolicy; 131 132 // there are documents without path, like versions 133 if (rootSource == null) { 134 throw new IllegalArgumentException("source node must be specified"); 135 } 136 } 137 138 public GenericThreadedImportTask(String repositoryName, SourceNode rootSource, DocumentModel rootDoc, 139 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 140 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy, String jobName) { 141 this(null, rootSource, rootDoc, skipContainerCreation, rsLogger, batchSize, factory, threadPolicy); 142 this.jobName = jobName; 143 this.repositoryName = repositoryName; 144 } 145 146 protected CoreSession getCoreSession() { 147 return session; 148 } 149 150 protected void commit() { 151 commit(false); 152 } 153 154 protected void commit(boolean force) { 155 uploadedFiles++; 156 if (uploadedFiles % 10 == 0) { 157 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 158 } 159 160 if (uploadedFiles % batchSize == 0 || force) { 161 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.session_save"); 162 Split split = stopwatch.start(); 163 fslog("Committing Core Session after " + uploadedFiles + " files", true); 164 session.save(); 165 TransactionHelper.commitOrRollbackTransaction(); 166 TransactionHelper.startTransaction(transactionTimeout); 167 split.stop(); 168 } 169 } 170 171 protected DocumentModel doCreateFolderishNode(DocumentModel parent, SourceNode node) { 172 if (!shouldImportDocument(node)) { 173 return null; 174 } 175 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_folder"); 176 Split split = stopwatch.start(); 177 DocumentModel folder = null; 178 try { 179 folder = getFactory().createFolderishNode(session, parent, node); 180 } catch (IOException e) { 181 String errorMsg = "Unable to create folderish document for " + node.getSourcePath() + ":" + e 182 + (e.getCause() != null ? e.getCause() : ""); 183 fslog(errorMsg, true); 184 log.error(errorMsg); 185 // Process folderish node creation error and check if the global 186 // import task should continue 187 boolean shouldImportTaskContinue = getFactory().processFolderishNodeCreationError(session, parent, node); 188 if (!shouldImportTaskContinue) { 189 throw new NuxeoException(e); 190 } 191 } finally { 192 split.stop(); 193 } 194 if (folder != null) { 195 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 196 fslog("Created Folder " + folder.getName() + " at " + parentPath, true); 197 198 // save session if needed 199 commit(); 200 } 201 return folder; 202 203 } 204 205 protected DocumentModel doCreateLeafNode(DocumentModel parent, SourceNode node) throws IOException { 206 if (!shouldImportDocument(node)) { 207 return null; 208 } 209 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_leaf"); 210 Split split = stopwatch.start(); 211 DocumentModel leaf = null; 212 try { 213 leaf = getFactory().createLeafNode(session, parent, node); 214 } catch (IOException e) { 215 String errMsg = "Unable to create leaf document for " + node.getSourcePath() + ":" + e 216 + (e.getCause() != null ? e.getCause() : ""); 217 fslog(errMsg, true); 218 log.error(errMsg); 219 // Process leaf node creation error and check if the global 220 // import task should continue 221 boolean shouldImportTaskContinue = getFactory().processLeafNodeCreationError(session, parent, node); 222 if (!shouldImportTaskContinue) { 223 throw new NuxeoException(e); 224 } 225 } finally { 226 split.stop(); 227 } 228 BlobHolder bh = node.getBlobHolder(); 229 if (leaf != null && bh != null) { 230 Blob blob = bh.getBlob(); 231 if (blob != null) { 232 long fileSize = blob.getLength(); 233 String fileName = blob.getFilename(); 234 if (fileSize > 0) { 235 long kbSize = fileSize / 1024; 236 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 237 fslog("Created doc " + leaf.getName() + " at " + parentPath + " with file " + fileName 238 + " of size " + kbSize + "KB", true); 239 } 240 uploadedKO += fileSize; 241 } 242 243 // send an event about the imported document 244 EventProducer eventProducer = Framework.getService(EventProducer.class); 245 EventContextImpl eventContext = new DocumentEventContext(session, session.getPrincipal(), leaf); 246 Event event = eventContext.newEvent(DOC_IMPORTED_EVENT); 247 eventProducer.fireEvent(event); 248 249 // save session if needed 250 commit(); 251 } 252 return leaf; 253 } 254 255 protected boolean shouldImportDocument(SourceNode node) { 256 for (ImportingDocumentFilter importingDocumentFilter : importingDocumentFilters) { 257 if (!importingDocumentFilter.shouldImportDocument(node)) { 258 return false; 259 } 260 } 261 return true; 262 } 263 264 protected GenericThreadedImportTask createNewTask(DocumentModel parent, SourceNode node, ImporterLogger log, 265 Integer batchSize) { 266 GenericThreadedImportTask newTask = new GenericThreadedImportTask(repositoryName, node, parent, 267 skipContainerCreation, log, batchSize, factory, threadPolicy, null); 268 newTask.addListeners(listeners); 269 newTask.addImportingDocumentFilters(importingDocumentFilters); 270 return newTask; 271 } 272 273 protected GenericThreadedImportTask createNewTaskIfNeeded(DocumentModel parent, SourceNode node) { 274 if (isRootTask) { 275 isRootTask = false; // don't fork Root thread on first folder 276 return null; 277 } 278 int scheduledTasks = GenericMultiThreadedImporter.getExecutor().getQueue().size(); 279 boolean createTask = getThreadPolicy().needToCreateThreadAfterNewFolderishNode(parent, node, uploadedFiles, 280 batchSize, scheduledTasks); 281 282 if (createTask) { 283 GenericThreadedImportTask newTask = createNewTask(parent, node, rsLogger, batchSize); 284 newTask.setBatchSize(getBatchSize()); 285 newTask.setSkipContainerCreation(true); 286 newTask.setTransactionTimeout(transactionTimeout); 287 return newTask; 288 } else { 289 return null; 290 } 291 } 292 293 protected void recursiveCreateDocumentFromNode(DocumentModel parent, SourceNode node) throws IOException { 294 295 if (getFactory().isTargetDocumentModelFolderish(node)) { 296 DocumentModel folder; 297 Boolean newThread = false; 298 if (skipContainerCreation) { 299 folder = parent; 300 skipContainerCreation = false; 301 newThread = true; 302 } else { 303 folder = doCreateFolderishNode(parent, node); 304 if (folder == null) { 305 return; 306 } 307 } 308 309 // get a new TaskImporter if available to start 310 // processing the sub-tree 311 GenericThreadedImportTask task = null; 312 if (!newThread) { 313 task = createNewTaskIfNeeded(folder, node); 314 } 315 if (task != null) { 316 // force comit before starting new thread 317 commit(true); 318 try { 319 GenericMultiThreadedImporter.getExecutor().execute(task); 320 } catch (RejectedExecutionException e) { 321 log.error("Import task rejected", e); 322 } 323 324 } else { 325 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.node_get_children"); 326 Split split = stopwatch.start(); 327 List<SourceNode> nodes = node.getChildren(); 328 split.stop(); 329 if (nodes != null) { 330 for (SourceNode child : nodes) { 331 recursiveCreateDocumentFromNode(folder, child); 332 } 333 } 334 } 335 } else { 336 doCreateLeafNode(parent, node); 337 } 338 } 339 340 public void setInputSource(SourceNode node) { 341 this.rootSource = node; 342 } 343 344 public void setTargetFolder(DocumentModel rootDoc) { 345 this.rootDoc = rootDoc; 346 } 347 348 // TODO isRunning is not yet handled correctly 349 public boolean isRunning() { 350 synchronized (this) { 351 return isRunning; 352 } 353 } 354 355 @Override 356 public synchronized void run() { 357 synchronized (this) { 358 if (isRunning) { 359 throw new IllegalStateException("Task already running"); 360 } 361 isRunning = true; 362 // versions have no path, target document can be null 363 if (rootSource == null) { 364 isRunning = false; 365 throw new IllegalArgumentException("source node must be specified"); 366 } 367 } 368 TransactionHelper.startTransaction(transactionTimeout); 369 boolean completedAbruptly = true; 370 try { 371 session = CoreInstance.getCoreSessionSystem(repositoryName); 372 log.info("Starting new import task"); 373 Framework.doPrivileged(() -> { 374 if (rootDoc != null) { 375 // reopen the root to be sure the session is valid 376 rootDoc = session.getDocument(rootDoc.getRef()); 377 } 378 try { 379 recursiveCreateDocumentFromNode(rootDoc, rootSource); 380 } catch (IOException e) { 381 throw new NuxeoException(e); 382 } 383 session.save(); 384 }); 385 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 386 completedAbruptly = false; 387 } catch (Exception e) { // deals with interrupt below 388 log.error("Error during import", e); 389 ExceptionUtils.checkInterrupt(e); 390 notifyImportError(); 391 } finally { 392 log.info("End of task"); 393 if (completedAbruptly) { 394 TransactionHelper.setTransactionRollbackOnly(); 395 } 396 TransactionHelper.commitOrRollbackTransaction(); 397 synchronized (this) { 398 isRunning = false; 399 } 400 } 401 } 402 403 // This should be done with log4j but I did not find a way to configure it 404 // the way I wanted ... 405 protected void fslog(String msg, boolean debug) { 406 if (debug) { 407 rsLogger.debug(msg); 408 } else { 409 rsLogger.info(msg); 410 } 411 } 412 413 public int getBatchSize() { 414 return batchSize; 415 } 416 417 public void setBatchSize(int batchSize) { 418 this.batchSize = batchSize; 419 } 420 421 public void setSkipContainerCreation(Boolean skipContainerCreation) { 422 this.skipContainerCreation = skipContainerCreation; 423 } 424 425 public void setRootTask() { 426 isRootTask = true; 427 taskCounter = 0; 428 taskId = "T0"; 429 } 430 431 protected ImporterThreadingPolicy getThreadPolicy() { 432 return threadPolicy; 433 } 434 435 protected ImporterDocumentModelFactory getFactory() { 436 return factory; 437 } 438 439 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 440 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 441 } 442 443 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 444 this.importingDocumentFilters.addAll(importingDocumentFilters); 445 } 446 447 public void addListeners(ImporterListener... listeners) { 448 addListeners(Arrays.asList(listeners)); 449 } 450 451 public void addListeners(Collection<ImporterListener> listeners) { 452 this.listeners.addAll(listeners); 453 } 454 455 public void setTransactionTimeout(int transactionTimeout) { 456 this.transactionTimeout = transactionTimeout < 1 ? TX_TIMEOUT : transactionTimeout; 457 } 458 459 protected void notifyImportError() { 460 for (ImporterListener listener : listeners) { 461 listener.importError(); 462 } 463 } 464 465 protected void setRootDoc(DocumentModel rootDoc) { 466 this.rootDoc = rootDoc; 467 } 468 469 protected void setRootSource(SourceNode rootSource) { 470 this.rootSource = rootSource; 471 } 472 473 protected void setFactory(ImporterDocumentModelFactory factory) { 474 this.factory = factory; 475 } 476 477 protected void setRsLogger(ImporterLogger rsLogger) { 478 this.rsLogger = rsLogger; 479 } 480 481 protected void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 482 this.threadPolicy = threadPolicy; 483 } 484 485 protected void setJobName(String jobName) { 486 this.jobName = jobName; 487 } 488 489}