001/*
002 * (C) Copyright 2006-2008 Nuxeo SAS (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Nuxeo - initial API and implementation
016 *
017 * $Id$
018 */
019
020package org.nuxeo.ecm.platform.importer.base;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.List;
027import java.util.concurrent.RejectedExecutionException;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.javasimon.SimonManager;
032import org.javasimon.Split;
033import org.javasimon.Stopwatch;
034import org.nuxeo.common.utils.ExceptionUtils;
035import org.nuxeo.ecm.core.api.Blob;
036import org.nuxeo.ecm.core.api.CoreInstance;
037import org.nuxeo.ecm.core.api.CoreSession;
038import org.nuxeo.ecm.core.api.DocumentModel;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.api.blobholder.BlobHolder;
041import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
042import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
043import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
044import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
045import org.nuxeo.ecm.platform.importer.source.SourceNode;
046import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
047import org.nuxeo.runtime.transaction.TransactionHelper;
048
049/**
050 * Generic importer task
051 *
052 * @author Thierry Delprat
053 */
054public class GenericThreadedImportTask implements Runnable {
055
056    private static final Log log = LogFactory.getLog(GenericThreadedImportTask.class);
057
058    protected static int taskCounter = 0;
059
060    protected boolean isRunning = false;
061
062    protected long uploadedFiles = 0;
063
064    protected long uploadedKO;
065
066    protected int batchSize;
067
068    protected CoreSession session;
069
070    protected DocumentModel rootDoc;
071
072    protected SourceNode rootSource;
073
074    protected Boolean skipContainerCreation = false;
075
076    protected Boolean isRootTask = false;
077
078    protected String taskId = null;
079
080    public static final int TX_TIMEOUT = 600;
081
082    protected int transactionTimeout = TX_TIMEOUT;
083
084    protected ImporterThreadingPolicy threadPolicy;
085
086    protected ImporterDocumentModelFactory factory;
087
088    protected String jobName;
089
090    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
091
092    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
093
094    protected String repositoryName;
095
096    private static synchronized int getNextTaskId() {
097        taskCounter += 1;
098        return taskCounter;
099    }
100
101    protected ImporterLogger rsLogger = null;
102
103    protected GenericThreadedImportTask(CoreSession session) {
104        this.session = session;
105        uploadedFiles = 0;
106        taskId = "T" + getNextTaskId();
107    }
108
109    protected GenericThreadedImportTask(CoreSession session, SourceNode rootSource, DocumentModel rootDoc,
110            boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize,
111            ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy) {
112        this.rsLogger = rsLogger;
113        this.session = session;
114        this.batchSize = batchSize;
115        uploadedFiles = 0;
116        taskId = "T" + getNextTaskId();
117        this.rootSource = rootSource;
118        this.rootDoc = rootDoc;
119        this.skipContainerCreation = skipContainerCreation;
120        this.factory = factory;
121        this.threadPolicy = threadPolicy;
122
123        // there are documents without path, like versions
124        if (rootSource == null) {
125            throw new IllegalArgumentException("source node must be specified");
126        }
127    }
128
129    public GenericThreadedImportTask(String repositoryName, SourceNode rootSource, DocumentModel rootDoc,
130            boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize,
131            ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy, String jobName) {
132        this(null, rootSource, rootDoc, skipContainerCreation, rsLogger, batchSize, factory, threadPolicy);
133        this.jobName = jobName;
134        this.repositoryName = repositoryName;
135    }
136
137    protected CoreSession getCoreSession() {
138        return session;
139    }
140
141    protected void commit() {
142        commit(false);
143    }
144
145    protected void commit(boolean force) {
146        uploadedFiles++;
147        if (uploadedFiles % 10 == 0) {
148            GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles);
149        }
150
151        if (uploadedFiles % batchSize == 0 || force) {
152            Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.session_save");
153            Split split = stopwatch.start();
154            fslog("Comiting Core Session after " + uploadedFiles + " files", true);
155            session.save();
156            TransactionHelper.commitOrRollbackTransaction();
157            TransactionHelper.startTransaction(transactionTimeout);
158            split.stop();
159        }
160    }
161
162    protected DocumentModel doCreateFolderishNode(DocumentModel parent, SourceNode node) {
163        if (!shouldImportDocument(node)) {
164            return null;
165        }
166        Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_folder");
167        Split split = stopwatch.start();
168        DocumentModel folder = null;
169        try {
170            folder = getFactory().createFolderishNode(session, parent, node);
171        } catch (IOException e) {
172            String errorMsg = "Unable to create folderish document for " + node.getSourcePath() + ":" + e
173                    + (e.getCause() != null ? e.getCause() : "");
174            fslog(errorMsg, true);
175            log.error(errorMsg);
176            // Process folderish node creation error and check if the global
177            // import task should continue
178            boolean shouldImportTaskContinue = getFactory().processFolderishNodeCreationError(session, parent, node);
179            if (!shouldImportTaskContinue) {
180                throw new NuxeoException(e);
181            }
182        } finally {
183            split.stop();
184        }
185        if (folder != null) {
186            String parentPath = (parent == null) ? "null" : parent.getPathAsString();
187            fslog("Created Folder " + folder.getName() + " at " + parentPath, true);
188
189            // save session if needed
190            commit();
191        }
192        return folder;
193
194    }
195
196    protected DocumentModel doCreateLeafNode(DocumentModel parent, SourceNode node) throws IOException {
197        if (!shouldImportDocument(node)) {
198            return null;
199        }
200        Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_leaf");
201        Split split = stopwatch.start();
202        DocumentModel leaf = null;
203        try {
204            leaf = getFactory().createLeafNode(session, parent, node);
205        } catch (IOException e) {
206            String errMsg = "Unable to create leaf document for " + node.getSourcePath() + ":" + e
207                    + (e.getCause() != null ? e.getCause() : "");
208            fslog(errMsg, true);
209            log.error(errMsg);
210            // Process leaf node creation error and check if the global
211            // import task should continue
212            boolean shouldImportTaskContinue = getFactory().processLeafNodeCreationError(session, parent, node);
213            if (!shouldImportTaskContinue) {
214                throw new NuxeoException(e);
215            }
216        } finally {
217            split.stop();
218        }
219        BlobHolder bh = node.getBlobHolder();
220        if (leaf != null && bh != null) {
221            Blob blob = bh.getBlob();
222            if (blob != null) {
223                long fileSize = blob.getLength();
224                String fileName = blob.getFilename();
225                if (fileSize > 0) {
226                    long kbSize = fileSize / 1024;
227                    String parentPath = (parent == null) ? "null" : parent.getPathAsString();
228                    fslog("Created doc " + leaf.getName() + " at " + parentPath + " with file " + fileName
229                            + " of size " + kbSize + "KB", true);
230                }
231                uploadedKO += fileSize;
232            }
233
234            // save session if needed
235            commit();
236        }
237        return leaf;
238    }
239
240    protected boolean shouldImportDocument(SourceNode node) {
241        for (ImportingDocumentFilter importingDocumentFilter : importingDocumentFilters) {
242            if (!importingDocumentFilter.shouldImportDocument(node)) {
243                return false;
244            }
245        }
246        return true;
247    }
248
249    protected GenericThreadedImportTask createNewTask(DocumentModel parent, SourceNode node, ImporterLogger log,
250            Integer batchSize) {
251        GenericThreadedImportTask newTask = new GenericThreadedImportTask(repositoryName, node, parent,
252                skipContainerCreation, log, batchSize, factory, threadPolicy, null);
253        newTask.addListeners(listeners);
254        newTask.addImportingDocumentFilters(importingDocumentFilters);
255        return newTask;
256    }
257
258    protected GenericThreadedImportTask createNewTaskIfNeeded(DocumentModel parent, SourceNode node) {
259        if (isRootTask) {
260            isRootTask = false; // don't fork Root thread on first folder
261            return null;
262        }
263        int scheduledTasks = GenericMultiThreadedImporter.getExecutor().getQueue().size();
264        boolean createTask = getThreadPolicy().needToCreateThreadAfterNewFolderishNode(parent, node, uploadedFiles,
265                batchSize, scheduledTasks);
266
267        if (createTask) {
268            GenericThreadedImportTask newTask = createNewTask(parent, node, rsLogger, batchSize);
269            newTask.setBatchSize(getBatchSize());
270            newTask.setSkipContainerCreation(true);
271            newTask.setTransactionTimeout(transactionTimeout);
272            return newTask;
273        } else {
274            return null;
275        }
276    }
277
278    protected void recursiveCreateDocumentFromNode(DocumentModel parent, SourceNode node) throws IOException {
279
280        if (getFactory().isTargetDocumentModelFolderish(node)) {
281            DocumentModel folder;
282            Boolean newThread = false;
283            if (skipContainerCreation) {
284                folder = parent;
285                skipContainerCreation = false;
286                newThread = true;
287            } else {
288                folder = doCreateFolderishNode(parent, node);
289                if (folder == null) {
290                    return;
291                }
292            }
293
294            // get a new TaskImporter if available to start
295            // processing the sub-tree
296            GenericThreadedImportTask task = null;
297            if (!newThread) {
298                task = createNewTaskIfNeeded(folder, node);
299            }
300            if (task != null) {
301                // force comit before starting new thread
302                commit(true);
303                try {
304                    GenericMultiThreadedImporter.getExecutor().execute(task);
305                } catch (RejectedExecutionException e) {
306                    log.error("Import task rejected", e);
307                }
308
309            } else {
310                Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.node_get_children");
311                Split split = stopwatch.start();
312                List<SourceNode> nodes = node.getChildren();
313                split.stop();
314                if (nodes != null) {
315                    for (SourceNode child : nodes) {
316                        recursiveCreateDocumentFromNode(folder, child);
317                    }
318                }
319            }
320        } else {
321            doCreateLeafNode(parent, node);
322        }
323    }
324
325    public void setInputSource(SourceNode node) {
326        this.rootSource = node;
327    }
328
329    public void setTargetFolder(DocumentModel rootDoc) {
330        this.rootDoc = rootDoc;
331    }
332
333    // TODO isRunning is not yet handled correctly
334    public boolean isRunning() {
335        synchronized (this) {
336            return isRunning;
337        }
338    }
339
340    public synchronized void run() {
341        TransactionHelper.startTransaction(transactionTimeout);
342        synchronized (this) {
343            if (isRunning) {
344                throw new IllegalStateException("Task already running");
345            }
346            isRunning = true;
347            // versions have no path, target document can be null
348            if (rootSource == null) {
349                isRunning = false;
350                throw new IllegalArgumentException("source node must be specified");
351            }
352        }
353        try {
354            session = CoreInstance.openCoreSessionSystem(repositoryName);
355            log.info("Starting new import task");
356            if (rootDoc != null) {
357                // reopen the root to be sure the session is valid
358                rootDoc = session.getDocument(rootDoc.getRef());
359            }
360            recursiveCreateDocumentFromNode(rootDoc, rootSource);
361            session.save();
362            GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles);
363            TransactionHelper.commitOrRollbackTransaction();
364        } catch (Exception e) { // deals with interrupt below
365            log.error("Error during import", e);
366            ExceptionUtils.checkInterrupt(e);
367            notifyImportError();
368        } finally {
369            log.info("End of task");
370            if (session != null) {
371                session.close();
372                session = null;
373            }
374            synchronized (this) {
375                isRunning = false;
376            }
377        }
378    }
379
380    // This should be done with log4j but I did not find a way to configure it
381    // the way I wanted ...
382    protected void fslog(String msg, boolean debug) {
383        if (debug) {
384            rsLogger.debug(msg);
385        } else {
386            rsLogger.info(msg);
387        }
388    }
389
390    public int getBatchSize() {
391        return batchSize;
392    }
393
394    public void setBatchSize(int batchSize) {
395        this.batchSize = batchSize;
396    }
397
398    public void setSkipContainerCreation(Boolean skipContainerCreation) {
399        this.skipContainerCreation = skipContainerCreation;
400    }
401
402    public void setRootTask() {
403        isRootTask = true;
404        taskCounter = 0;
405        taskId = "T0";
406    }
407
408    protected ImporterThreadingPolicy getThreadPolicy() {
409        return threadPolicy;
410    }
411
412    protected ImporterDocumentModelFactory getFactory() {
413        return factory;
414    }
415
416    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
417        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
418    }
419
420    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
421        this.importingDocumentFilters.addAll(importingDocumentFilters);
422    }
423
424    public void addListeners(ImporterListener... listeners) {
425        addListeners(Arrays.asList(listeners));
426    }
427
428    public void addListeners(Collection<ImporterListener> listeners) {
429        this.listeners.addAll(listeners);
430    }
431
432    public void setTransactionTimeout(int transactionTimeout) {
433        this.transactionTimeout = transactionTimeout < 1 ? TX_TIMEOUT : transactionTimeout;
434    }
435
436    protected void notifyImportError() {
437        for (ImporterListener listener : listeners) {
438            listener.importError();
439        }
440    }
441
442    protected void setRootDoc(DocumentModel rootDoc) {
443        this.rootDoc = rootDoc;
444    }
445
446    protected void setRootSource(SourceNode rootSource) {
447        this.rootSource = rootSource;
448    }
449
450    protected void setFactory(ImporterDocumentModelFactory factory) {
451        this.factory = factory;
452    }
453
454    protected void setRsLogger(ImporterLogger rsLogger) {
455        this.rsLogger = rsLogger;
456    }
457
458    protected void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
459        this.threadPolicy = threadPolicy;
460    }
461
462    protected void setJobName(String jobName) {
463        this.jobName = jobName;
464    }
465
466}