001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 * 019 * $Id$ 020 */ 021 022package org.nuxeo.ecm.platform.importer.base; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.RejectedExecutionException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.javasimon.SimonManager; 034import org.javasimon.Split; 035import org.javasimon.Stopwatch; 036import org.nuxeo.common.utils.ExceptionUtils; 037import org.nuxeo.ecm.core.api.Blob; 038import org.nuxeo.ecm.core.api.CoreInstance; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.api.blobholder.BlobHolder; 043import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory; 044import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter; 045import org.nuxeo.ecm.platform.importer.listener.ImporterListener; 046import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 047import org.nuxeo.ecm.platform.importer.source.SourceNode; 048import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy; 049import org.nuxeo.runtime.transaction.TransactionHelper; 050 051/** 052 * Generic importer task 053 * 054 * @author Thierry Delprat 055 */ 056public class GenericThreadedImportTask implements Runnable { 057 058 private static final Log log = LogFactory.getLog(GenericThreadedImportTask.class); 059 060 protected static int taskCounter = 0; 061 062 protected boolean isRunning = false; 063 064 protected long uploadedFiles = 0; 065 066 protected long uploadedKO; 067 068 protected int batchSize; 069 070 protected CoreSession session; 071 072 protected DocumentModel rootDoc; 073 074 protected SourceNode rootSource; 075 076 protected Boolean skipContainerCreation = false; 077 078 protected Boolean isRootTask = false; 079 080 protected String taskId = null; 081 082 public static final int TX_TIMEOUT = 600; 083 084 protected int transactionTimeout = TX_TIMEOUT; 085 086 protected ImporterThreadingPolicy threadPolicy; 087 088 protected ImporterDocumentModelFactory factory; 089 090 protected String jobName; 091 092 protected List<ImporterListener> listeners = new ArrayList<ImporterListener>(); 093 094 protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>(); 095 096 protected String repositoryName; 097 098 private static synchronized int getNextTaskId() { 099 taskCounter += 1; 100 return taskCounter; 101 } 102 103 protected ImporterLogger rsLogger = null; 104 105 protected GenericThreadedImportTask(CoreSession session) { 106 this.session = session; 107 uploadedFiles = 0; 108 taskId = "T" + getNextTaskId(); 109 } 110 111 protected GenericThreadedImportTask(CoreSession session, SourceNode rootSource, DocumentModel rootDoc, 112 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 113 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy) { 114 this.rsLogger = rsLogger; 115 this.session = session; 116 this.batchSize = batchSize; 117 uploadedFiles = 0; 118 taskId = "T" + getNextTaskId(); 119 this.rootSource = rootSource; 120 this.rootDoc = rootDoc; 121 this.skipContainerCreation = skipContainerCreation; 122 this.factory = factory; 123 this.threadPolicy = threadPolicy; 124 125 // there are documents without path, like versions 126 if (rootSource == null) { 127 throw new IllegalArgumentException("source node must be specified"); 128 } 129 } 130 131 public GenericThreadedImportTask(String repositoryName, SourceNode rootSource, DocumentModel rootDoc, 132 boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize, 133 ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy, String jobName) { 134 this(null, rootSource, rootDoc, skipContainerCreation, rsLogger, batchSize, factory, threadPolicy); 135 this.jobName = jobName; 136 this.repositoryName = repositoryName; 137 } 138 139 protected CoreSession getCoreSession() { 140 return session; 141 } 142 143 protected void commit() { 144 commit(false); 145 } 146 147 protected void commit(boolean force) { 148 uploadedFiles++; 149 if (uploadedFiles % 10 == 0) { 150 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 151 } 152 153 if (uploadedFiles % batchSize == 0 || force) { 154 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.session_save"); 155 Split split = stopwatch.start(); 156 fslog("Committing Core Session after " + uploadedFiles + " files", true); 157 session.save(); 158 TransactionHelper.commitOrRollbackTransaction(); 159 TransactionHelper.startTransaction(transactionTimeout); 160 split.stop(); 161 } 162 } 163 164 protected DocumentModel doCreateFolderishNode(DocumentModel parent, SourceNode node) { 165 if (!shouldImportDocument(node)) { 166 return null; 167 } 168 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_folder"); 169 Split split = stopwatch.start(); 170 DocumentModel folder = null; 171 try { 172 folder = getFactory().createFolderishNode(session, parent, node); 173 } catch (IOException e) { 174 String errorMsg = "Unable to create folderish document for " + node.getSourcePath() + ":" + e 175 + (e.getCause() != null ? e.getCause() : ""); 176 fslog(errorMsg, true); 177 log.error(errorMsg); 178 // Process folderish node creation error and check if the global 179 // import task should continue 180 boolean shouldImportTaskContinue = getFactory().processFolderishNodeCreationError(session, parent, node); 181 if (!shouldImportTaskContinue) { 182 throw new NuxeoException(e); 183 } 184 } finally { 185 split.stop(); 186 } 187 if (folder != null) { 188 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 189 fslog("Created Folder " + folder.getName() + " at " + parentPath, true); 190 191 // save session if needed 192 commit(); 193 } 194 return folder; 195 196 } 197 198 protected DocumentModel doCreateLeafNode(DocumentModel parent, SourceNode node) throws IOException { 199 if (!shouldImportDocument(node)) { 200 return null; 201 } 202 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_leaf"); 203 Split split = stopwatch.start(); 204 DocumentModel leaf = null; 205 try { 206 leaf = getFactory().createLeafNode(session, parent, node); 207 } catch (IOException e) { 208 String errMsg = "Unable to create leaf document for " + node.getSourcePath() + ":" + e 209 + (e.getCause() != null ? e.getCause() : ""); 210 fslog(errMsg, true); 211 log.error(errMsg); 212 // Process leaf node creation error and check if the global 213 // import task should continue 214 boolean shouldImportTaskContinue = getFactory().processLeafNodeCreationError(session, parent, node); 215 if (!shouldImportTaskContinue) { 216 throw new NuxeoException(e); 217 } 218 } finally { 219 split.stop(); 220 } 221 BlobHolder bh = node.getBlobHolder(); 222 if (leaf != null && bh != null) { 223 Blob blob = bh.getBlob(); 224 if (blob != null) { 225 long fileSize = blob.getLength(); 226 String fileName = blob.getFilename(); 227 if (fileSize > 0) { 228 long kbSize = fileSize / 1024; 229 String parentPath = (parent == null) ? "null" : parent.getPathAsString(); 230 fslog("Created doc " + leaf.getName() + " at " + parentPath + " with file " + fileName 231 + " of size " + kbSize + "KB", true); 232 } 233 uploadedKO += fileSize; 234 } 235 236 // save session if needed 237 commit(); 238 } 239 return leaf; 240 } 241 242 protected boolean shouldImportDocument(SourceNode node) { 243 for (ImportingDocumentFilter importingDocumentFilter : importingDocumentFilters) { 244 if (!importingDocumentFilter.shouldImportDocument(node)) { 245 return false; 246 } 247 } 248 return true; 249 } 250 251 protected GenericThreadedImportTask createNewTask(DocumentModel parent, SourceNode node, ImporterLogger log, 252 Integer batchSize) { 253 GenericThreadedImportTask newTask = new GenericThreadedImportTask(repositoryName, node, parent, 254 skipContainerCreation, log, batchSize, factory, threadPolicy, null); 255 newTask.addListeners(listeners); 256 newTask.addImportingDocumentFilters(importingDocumentFilters); 257 return newTask; 258 } 259 260 protected GenericThreadedImportTask createNewTaskIfNeeded(DocumentModel parent, SourceNode node) { 261 if (isRootTask) { 262 isRootTask = false; // don't fork Root thread on first folder 263 return null; 264 } 265 int scheduledTasks = GenericMultiThreadedImporter.getExecutor().getQueue().size(); 266 boolean createTask = getThreadPolicy().needToCreateThreadAfterNewFolderishNode(parent, node, uploadedFiles, 267 batchSize, scheduledTasks); 268 269 if (createTask) { 270 GenericThreadedImportTask newTask = createNewTask(parent, node, rsLogger, batchSize); 271 newTask.setBatchSize(getBatchSize()); 272 newTask.setSkipContainerCreation(true); 273 newTask.setTransactionTimeout(transactionTimeout); 274 return newTask; 275 } else { 276 return null; 277 } 278 } 279 280 protected void recursiveCreateDocumentFromNode(DocumentModel parent, SourceNode node) throws IOException { 281 282 if (getFactory().isTargetDocumentModelFolderish(node)) { 283 DocumentModel folder; 284 Boolean newThread = false; 285 if (skipContainerCreation) { 286 folder = parent; 287 skipContainerCreation = false; 288 newThread = true; 289 } else { 290 folder = doCreateFolderishNode(parent, node); 291 if (folder == null) { 292 return; 293 } 294 } 295 296 // get a new TaskImporter if available to start 297 // processing the sub-tree 298 GenericThreadedImportTask task = null; 299 if (!newThread) { 300 task = createNewTaskIfNeeded(folder, node); 301 } 302 if (task != null) { 303 // force comit before starting new thread 304 commit(true); 305 try { 306 GenericMultiThreadedImporter.getExecutor().execute(task); 307 } catch (RejectedExecutionException e) { 308 log.error("Import task rejected", e); 309 } 310 311 } else { 312 Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.node_get_children"); 313 Split split = stopwatch.start(); 314 List<SourceNode> nodes = node.getChildren(); 315 split.stop(); 316 if (nodes != null) { 317 for (SourceNode child : nodes) { 318 recursiveCreateDocumentFromNode(folder, child); 319 } 320 } 321 } 322 } else { 323 doCreateLeafNode(parent, node); 324 } 325 } 326 327 public void setInputSource(SourceNode node) { 328 this.rootSource = node; 329 } 330 331 public void setTargetFolder(DocumentModel rootDoc) { 332 this.rootDoc = rootDoc; 333 } 334 335 // TODO isRunning is not yet handled correctly 336 public boolean isRunning() { 337 synchronized (this) { 338 return isRunning; 339 } 340 } 341 342 public synchronized void run() { 343 TransactionHelper.startTransaction(transactionTimeout); 344 synchronized (this) { 345 if (isRunning) { 346 throw new IllegalStateException("Task already running"); 347 } 348 isRunning = true; 349 // versions have no path, target document can be null 350 if (rootSource == null) { 351 isRunning = false; 352 throw new IllegalArgumentException("source node must be specified"); 353 } 354 } 355 try { 356 session = CoreInstance.openCoreSessionSystem(repositoryName); 357 log.info("Starting new import task"); 358 if (rootDoc != null) { 359 // reopen the root to be sure the session is valid 360 rootDoc = session.getDocument(rootDoc.getRef()); 361 } 362 recursiveCreateDocumentFromNode(rootDoc, rootSource); 363 session.save(); 364 GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles); 365 TransactionHelper.commitOrRollbackTransaction(); 366 } catch (Exception e) { // deals with interrupt below 367 log.error("Error during import", e); 368 ExceptionUtils.checkInterrupt(e); 369 notifyImportError(); 370 } finally { 371 log.info("End of task"); 372 if (session != null) { 373 session.close(); 374 session = null; 375 } 376 synchronized (this) { 377 isRunning = false; 378 } 379 } 380 } 381 382 // This should be done with log4j but I did not find a way to configure it 383 // the way I wanted ... 384 protected void fslog(String msg, boolean debug) { 385 if (debug) { 386 rsLogger.debug(msg); 387 } else { 388 rsLogger.info(msg); 389 } 390 } 391 392 public int getBatchSize() { 393 return batchSize; 394 } 395 396 public void setBatchSize(int batchSize) { 397 this.batchSize = batchSize; 398 } 399 400 public void setSkipContainerCreation(Boolean skipContainerCreation) { 401 this.skipContainerCreation = skipContainerCreation; 402 } 403 404 public void setRootTask() { 405 isRootTask = true; 406 taskCounter = 0; 407 taskId = "T0"; 408 } 409 410 protected ImporterThreadingPolicy getThreadPolicy() { 411 return threadPolicy; 412 } 413 414 protected ImporterDocumentModelFactory getFactory() { 415 return factory; 416 } 417 418 public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) { 419 addImportingDocumentFilters(Arrays.asList(importingDocumentFilters)); 420 } 421 422 public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) { 423 this.importingDocumentFilters.addAll(importingDocumentFilters); 424 } 425 426 public void addListeners(ImporterListener... listeners) { 427 addListeners(Arrays.asList(listeners)); 428 } 429 430 public void addListeners(Collection<ImporterListener> listeners) { 431 this.listeners.addAll(listeners); 432 } 433 434 public void setTransactionTimeout(int transactionTimeout) { 435 this.transactionTimeout = transactionTimeout < 1 ? TX_TIMEOUT : transactionTimeout; 436 } 437 438 protected void notifyImportError() { 439 for (ImporterListener listener : listeners) { 440 listener.importError(); 441 } 442 } 443 444 protected void setRootDoc(DocumentModel rootDoc) { 445 this.rootDoc = rootDoc; 446 } 447 448 protected void setRootSource(SourceNode rootSource) { 449 this.rootSource = rootSource; 450 } 451 452 protected void setFactory(ImporterDocumentModelFactory factory) { 453 this.factory = factory; 454 } 455 456 protected void setRsLogger(ImporterLogger rsLogger) { 457 this.rsLogger = rsLogger; 458 } 459 460 protected void setThreadPolicy(ImporterThreadingPolicy threadPolicy) { 461 this.threadPolicy = threadPolicy; 462 } 463 464 protected void setJobName(String jobName) { 465 this.jobName = jobName; 466 } 467 468}