001/*
002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 *
019 * $Id$
020 */
021
022package org.nuxeo.ecm.platform.importer.base;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.List;
029import java.util.concurrent.RejectedExecutionException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.javasimon.SimonManager;
034import org.javasimon.Split;
035import org.javasimon.Stopwatch;
036import org.nuxeo.common.utils.ExceptionUtils;
037import org.nuxeo.ecm.core.api.Blob;
038import org.nuxeo.ecm.core.api.CoreInstance;
039import org.nuxeo.ecm.core.api.CoreSession;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.api.blobholder.BlobHolder;
043import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
044import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
045import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
046import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
047import org.nuxeo.ecm.platform.importer.source.SourceNode;
048import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
049import org.nuxeo.runtime.transaction.TransactionHelper;
050
051/**
052 * Generic importer task
053 *
054 * @author Thierry Delprat
055 */
056public class GenericThreadedImportTask implements Runnable {
057
058    private static final Log log = LogFactory.getLog(GenericThreadedImportTask.class);
059
060    protected static int taskCounter = 0;
061
062    protected boolean isRunning = false;
063
064    protected long uploadedFiles = 0;
065
066    protected long uploadedKO;
067
068    protected int batchSize;
069
070    protected CoreSession session;
071
072    protected DocumentModel rootDoc;
073
074    protected SourceNode rootSource;
075
076    protected Boolean skipContainerCreation = false;
077
078    protected Boolean isRootTask = false;
079
080    protected String taskId = null;
081
082    public static final int TX_TIMEOUT = 600;
083
084    protected int transactionTimeout = TX_TIMEOUT;
085
086    protected ImporterThreadingPolicy threadPolicy;
087
088    protected ImporterDocumentModelFactory factory;
089
090    protected String jobName;
091
092    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
093
094    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
095
096    protected String repositoryName;
097
098    private static synchronized int getNextTaskId() {
099        taskCounter += 1;
100        return taskCounter;
101    }
102
103    protected ImporterLogger rsLogger = null;
104
105    protected GenericThreadedImportTask(CoreSession session) {
106        this.session = session;
107        uploadedFiles = 0;
108        taskId = "T" + getNextTaskId();
109    }
110
111    protected GenericThreadedImportTask(CoreSession session, SourceNode rootSource, DocumentModel rootDoc,
112            boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize,
113            ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy) {
114        this.rsLogger = rsLogger;
115        this.session = session;
116        this.batchSize = batchSize;
117        uploadedFiles = 0;
118        taskId = "T" + getNextTaskId();
119        this.rootSource = rootSource;
120        this.rootDoc = rootDoc;
121        this.skipContainerCreation = skipContainerCreation;
122        this.factory = factory;
123        this.threadPolicy = threadPolicy;
124
125        // there are documents without path, like versions
126        if (rootSource == null) {
127            throw new IllegalArgumentException("source node must be specified");
128        }
129    }
130
131    public GenericThreadedImportTask(String repositoryName, SourceNode rootSource, DocumentModel rootDoc,
132            boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize,
133            ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy, String jobName) {
134        this(null, rootSource, rootDoc, skipContainerCreation, rsLogger, batchSize, factory, threadPolicy);
135        this.jobName = jobName;
136        this.repositoryName = repositoryName;
137    }
138
139    protected CoreSession getCoreSession() {
140        return session;
141    }
142
143    protected void commit() {
144        commit(false);
145    }
146
147    protected void commit(boolean force) {
148        uploadedFiles++;
149        if (uploadedFiles % 10 == 0) {
150            GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles);
151        }
152
153        if (uploadedFiles % batchSize == 0 || force) {
154            Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.session_save");
155            Split split = stopwatch.start();
156            fslog("Committing Core Session after " + uploadedFiles + " files", true);
157            session.save();
158            TransactionHelper.commitOrRollbackTransaction();
159            TransactionHelper.startTransaction(transactionTimeout);
160            split.stop();
161        }
162    }
163
164    protected DocumentModel doCreateFolderishNode(DocumentModel parent, SourceNode node) {
165        if (!shouldImportDocument(node)) {
166            return null;
167        }
168        Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_folder");
169        Split split = stopwatch.start();
170        DocumentModel folder = null;
171        try {
172            folder = getFactory().createFolderishNode(session, parent, node);
173        } catch (IOException e) {
174            String errorMsg = "Unable to create folderish document for " + node.getSourcePath() + ":" + e
175                    + (e.getCause() != null ? e.getCause() : "");
176            fslog(errorMsg, true);
177            log.error(errorMsg);
178            // Process folderish node creation error and check if the global
179            // import task should continue
180            boolean shouldImportTaskContinue = getFactory().processFolderishNodeCreationError(session, parent, node);
181            if (!shouldImportTaskContinue) {
182                throw new NuxeoException(e);
183            }
184        } finally {
185            split.stop();
186        }
187        if (folder != null) {
188            String parentPath = (parent == null) ? "null" : parent.getPathAsString();
189            fslog("Created Folder " + folder.getName() + " at " + parentPath, true);
190
191            // save session if needed
192            commit();
193        }
194        return folder;
195
196    }
197
198    protected DocumentModel doCreateLeafNode(DocumentModel parent, SourceNode node) throws IOException {
199        if (!shouldImportDocument(node)) {
200            return null;
201        }
202        Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_leaf");
203        Split split = stopwatch.start();
204        DocumentModel leaf = null;
205        try {
206            leaf = getFactory().createLeafNode(session, parent, node);
207        } catch (IOException e) {
208            String errMsg = "Unable to create leaf document for " + node.getSourcePath() + ":" + e
209                    + (e.getCause() != null ? e.getCause() : "");
210            fslog(errMsg, true);
211            log.error(errMsg);
212            // Process leaf node creation error and check if the global
213            // import task should continue
214            boolean shouldImportTaskContinue = getFactory().processLeafNodeCreationError(session, parent, node);
215            if (!shouldImportTaskContinue) {
216                throw new NuxeoException(e);
217            }
218        } finally {
219            split.stop();
220        }
221        BlobHolder bh = node.getBlobHolder();
222        if (leaf != null && bh != null) {
223            Blob blob = bh.getBlob();
224            if (blob != null) {
225                long fileSize = blob.getLength();
226                String fileName = blob.getFilename();
227                if (fileSize > 0) {
228                    long kbSize = fileSize / 1024;
229                    String parentPath = (parent == null) ? "null" : parent.getPathAsString();
230                    fslog("Created doc " + leaf.getName() + " at " + parentPath + " with file " + fileName
231                            + " of size " + kbSize + "KB", true);
232                }
233                uploadedKO += fileSize;
234            }
235
236            // save session if needed
237            commit();
238        }
239        return leaf;
240    }
241
242    protected boolean shouldImportDocument(SourceNode node) {
243        for (ImportingDocumentFilter importingDocumentFilter : importingDocumentFilters) {
244            if (!importingDocumentFilter.shouldImportDocument(node)) {
245                return false;
246            }
247        }
248        return true;
249    }
250
251    protected GenericThreadedImportTask createNewTask(DocumentModel parent, SourceNode node, ImporterLogger log,
252            Integer batchSize) {
253        GenericThreadedImportTask newTask = new GenericThreadedImportTask(repositoryName, node, parent,
254                skipContainerCreation, log, batchSize, factory, threadPolicy, null);
255        newTask.addListeners(listeners);
256        newTask.addImportingDocumentFilters(importingDocumentFilters);
257        return newTask;
258    }
259
260    protected GenericThreadedImportTask createNewTaskIfNeeded(DocumentModel parent, SourceNode node) {
261        if (isRootTask) {
262            isRootTask = false; // don't fork Root thread on first folder
263            return null;
264        }
265        int scheduledTasks = GenericMultiThreadedImporter.getExecutor().getQueue().size();
266        boolean createTask = getThreadPolicy().needToCreateThreadAfterNewFolderishNode(parent, node, uploadedFiles,
267                batchSize, scheduledTasks);
268
269        if (createTask) {
270            GenericThreadedImportTask newTask = createNewTask(parent, node, rsLogger, batchSize);
271            newTask.setBatchSize(getBatchSize());
272            newTask.setSkipContainerCreation(true);
273            newTask.setTransactionTimeout(transactionTimeout);
274            return newTask;
275        } else {
276            return null;
277        }
278    }
279
280    protected void recursiveCreateDocumentFromNode(DocumentModel parent, SourceNode node) throws IOException {
281
282        if (getFactory().isTargetDocumentModelFolderish(node)) {
283            DocumentModel folder;
284            Boolean newThread = false;
285            if (skipContainerCreation) {
286                folder = parent;
287                skipContainerCreation = false;
288                newThread = true;
289            } else {
290                folder = doCreateFolderishNode(parent, node);
291                if (folder == null) {
292                    return;
293                }
294            }
295
296            // get a new TaskImporter if available to start
297            // processing the sub-tree
298            GenericThreadedImportTask task = null;
299            if (!newThread) {
300                task = createNewTaskIfNeeded(folder, node);
301            }
302            if (task != null) {
303                // force comit before starting new thread
304                commit(true);
305                try {
306                    GenericMultiThreadedImporter.getExecutor().execute(task);
307                } catch (RejectedExecutionException e) {
308                    log.error("Import task rejected", e);
309                }
310
311            } else {
312                Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.node_get_children");
313                Split split = stopwatch.start();
314                List<SourceNode> nodes = node.getChildren();
315                split.stop();
316                if (nodes != null) {
317                    for (SourceNode child : nodes) {
318                        recursiveCreateDocumentFromNode(folder, child);
319                    }
320                }
321            }
322        } else {
323            doCreateLeafNode(parent, node);
324        }
325    }
326
327    public void setInputSource(SourceNode node) {
328        this.rootSource = node;
329    }
330
331    public void setTargetFolder(DocumentModel rootDoc) {
332        this.rootDoc = rootDoc;
333    }
334
335    // TODO isRunning is not yet handled correctly
336    public boolean isRunning() {
337        synchronized (this) {
338            return isRunning;
339        }
340    }
341
342    public synchronized void run() {
343        TransactionHelper.startTransaction(transactionTimeout);
344        synchronized (this) {
345            if (isRunning) {
346                throw new IllegalStateException("Task already running");
347            }
348            isRunning = true;
349            // versions have no path, target document can be null
350            if (rootSource == null) {
351                isRunning = false;
352                throw new IllegalArgumentException("source node must be specified");
353            }
354        }
355        try {
356            session = CoreInstance.openCoreSessionSystem(repositoryName);
357            log.info("Starting new import task");
358            if (rootDoc != null) {
359                // reopen the root to be sure the session is valid
360                rootDoc = session.getDocument(rootDoc.getRef());
361            }
362            recursiveCreateDocumentFromNode(rootDoc, rootSource);
363            session.save();
364            GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles);
365            TransactionHelper.commitOrRollbackTransaction();
366        } catch (Exception e) { // deals with interrupt below
367            log.error("Error during import", e);
368            ExceptionUtils.checkInterrupt(e);
369            notifyImportError();
370        } finally {
371            log.info("End of task");
372            if (session != null) {
373                session.close();
374                session = null;
375            }
376            synchronized (this) {
377                isRunning = false;
378            }
379        }
380    }
381
382    // This should be done with log4j but I did not find a way to configure it
383    // the way I wanted ...
384    protected void fslog(String msg, boolean debug) {
385        if (debug) {
386            rsLogger.debug(msg);
387        } else {
388            rsLogger.info(msg);
389        }
390    }
391
392    public int getBatchSize() {
393        return batchSize;
394    }
395
396    public void setBatchSize(int batchSize) {
397        this.batchSize = batchSize;
398    }
399
400    public void setSkipContainerCreation(Boolean skipContainerCreation) {
401        this.skipContainerCreation = skipContainerCreation;
402    }
403
404    public void setRootTask() {
405        isRootTask = true;
406        taskCounter = 0;
407        taskId = "T0";
408    }
409
410    protected ImporterThreadingPolicy getThreadPolicy() {
411        return threadPolicy;
412    }
413
414    protected ImporterDocumentModelFactory getFactory() {
415        return factory;
416    }
417
418    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
419        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
420    }
421
422    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
423        this.importingDocumentFilters.addAll(importingDocumentFilters);
424    }
425
426    public void addListeners(ImporterListener... listeners) {
427        addListeners(Arrays.asList(listeners));
428    }
429
430    public void addListeners(Collection<ImporterListener> listeners) {
431        this.listeners.addAll(listeners);
432    }
433
434    public void setTransactionTimeout(int transactionTimeout) {
435        this.transactionTimeout = transactionTimeout < 1 ? TX_TIMEOUT : transactionTimeout;
436    }
437
438    protected void notifyImportError() {
439        for (ImporterListener listener : listeners) {
440            listener.importError();
441        }
442    }
443
444    protected void setRootDoc(DocumentModel rootDoc) {
445        this.rootDoc = rootDoc;
446    }
447
448    protected void setRootSource(SourceNode rootSource) {
449        this.rootSource = rootSource;
450    }
451
452    protected void setFactory(ImporterDocumentModelFactory factory) {
453        this.factory = factory;
454    }
455
456    protected void setRsLogger(ImporterLogger rsLogger) {
457        this.rsLogger = rsLogger;
458    }
459
460    protected void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
461        this.threadPolicy = threadPolicy;
462    }
463
464    protected void setJobName(String jobName) {
465        this.jobName = jobName;
466    }
467
468}