001/*
002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 *
019 * $Id$
020 */
021
022package org.nuxeo.ecm.platform.importer.base;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.List;
029import java.util.concurrent.RejectedExecutionException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.javasimon.SimonManager;
034import org.javasimon.Split;
035import org.javasimon.Stopwatch;
036import org.nuxeo.common.utils.ExceptionUtils;
037import org.nuxeo.ecm.core.api.Blob;
038import org.nuxeo.ecm.core.api.CoreInstance;
039import org.nuxeo.ecm.core.api.CoreSession;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.api.blobholder.BlobHolder;
043import org.nuxeo.ecm.core.event.Event;
044import org.nuxeo.ecm.core.event.EventProducer;
045import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
046import org.nuxeo.ecm.core.event.impl.EventContextImpl;
047import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
048import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
049import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
050import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
051import org.nuxeo.ecm.platform.importer.source.SourceNode;
052import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
053import org.nuxeo.runtime.api.Framework;
054import org.nuxeo.runtime.transaction.TransactionHelper;
055
056/**
057 * Generic importer task
058 *
059 * @author Thierry Delprat
060 */
061public class GenericThreadedImportTask implements Runnable {
062
063    public static final String DOC_IMPORTED_EVENT = "documentImportedWithPlatformImporter";
064
065    private static final Log log = LogFactory.getLog(GenericThreadedImportTask.class);
066
067    protected static int taskCounter = 0;
068
069    protected boolean isRunning = false;
070
071    protected long uploadedFiles = 0;
072
073    protected long uploadedKO;
074
075    protected int batchSize;
076
077    protected CoreSession session;
078
079    protected DocumentModel rootDoc;
080
081    protected SourceNode rootSource;
082
083    protected Boolean skipContainerCreation = false;
084
085    protected Boolean isRootTask = false;
086
087    protected String taskId = null;
088
089    public static final int TX_TIMEOUT = 600;
090
091    protected int transactionTimeout = TX_TIMEOUT;
092
093    protected ImporterThreadingPolicy threadPolicy;
094
095    protected ImporterDocumentModelFactory factory;
096
097    protected String jobName;
098
099    protected List<ImporterListener> listeners = new ArrayList<>();
100
101    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<>();
102
103    protected String repositoryName;
104
105    private static synchronized int getNextTaskId() {
106        taskCounter += 1;
107        return taskCounter;
108    }
109
110    protected ImporterLogger rsLogger = null;
111
112    protected GenericThreadedImportTask(CoreSession session) {
113        this.session = session;
114        uploadedFiles = 0;
115        taskId = "T" + getNextTaskId();
116    }
117
118    protected GenericThreadedImportTask(CoreSession session, SourceNode rootSource, DocumentModel rootDoc,
119            boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize,
120            ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy) {
121        this.rsLogger = rsLogger;
122        this.session = session;
123        this.batchSize = batchSize;
124        uploadedFiles = 0;
125        taskId = "T" + getNextTaskId();
126        this.rootSource = rootSource;
127        this.rootDoc = rootDoc;
128        this.skipContainerCreation = skipContainerCreation;
129        this.factory = factory;
130        this.threadPolicy = threadPolicy;
131
132        // there are documents without path, like versions
133        if (rootSource == null) {
134            throw new IllegalArgumentException("source node must be specified");
135        }
136    }
137
138    public GenericThreadedImportTask(String repositoryName, SourceNode rootSource, DocumentModel rootDoc,
139            boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize,
140            ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy, String jobName) {
141        this(null, rootSource, rootDoc, skipContainerCreation, rsLogger, batchSize, factory, threadPolicy);
142        this.jobName = jobName;
143        this.repositoryName = repositoryName;
144    }
145
146    protected CoreSession getCoreSession() {
147        return session;
148    }
149
150    protected void commit() {
151        commit(false);
152    }
153
154    protected void commit(boolean force) {
155        uploadedFiles++;
156        if (uploadedFiles % 10 == 0) {
157            GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles);
158        }
159
160        if (uploadedFiles % batchSize == 0 || force) {
161            Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.session_save");
162            Split split = stopwatch.start();
163            fslog("Committing Core Session after " + uploadedFiles + " files", true);
164            session.save();
165            TransactionHelper.commitOrRollbackTransaction();
166            TransactionHelper.startTransaction(transactionTimeout);
167            split.stop();
168        }
169    }
170
171    protected DocumentModel doCreateFolderishNode(DocumentModel parent, SourceNode node) {
172        if (!shouldImportDocument(node)) {
173            return null;
174        }
175        Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_folder");
176        Split split = stopwatch.start();
177        DocumentModel folder = null;
178        try {
179            folder = getFactory().createFolderishNode(session, parent, node);
180        } catch (IOException e) {
181            String errorMsg = "Unable to create folderish document for " + node.getSourcePath() + ":" + e
182                    + (e.getCause() != null ? e.getCause() : "");
183            fslog(errorMsg, true);
184            log.error(errorMsg);
185            // Process folderish node creation error and check if the global
186            // import task should continue
187            boolean shouldImportTaskContinue = getFactory().processFolderishNodeCreationError(session, parent, node);
188            if (!shouldImportTaskContinue) {
189                throw new NuxeoException(e);
190            }
191        } finally {
192            split.stop();
193        }
194        if (folder != null) {
195            String parentPath = (parent == null) ? "null" : parent.getPathAsString();
196            fslog("Created Folder " + folder.getName() + " at " + parentPath, true);
197
198            // save session if needed
199            commit();
200        }
201        return folder;
202
203    }
204
205    protected DocumentModel doCreateLeafNode(DocumentModel parent, SourceNode node) throws IOException {
206        if (!shouldImportDocument(node)) {
207            return null;
208        }
209        Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_leaf");
210        Split split = stopwatch.start();
211        DocumentModel leaf = null;
212        try {
213            leaf = getFactory().createLeafNode(session, parent, node);
214        } catch (IOException e) {
215            String errMsg = "Unable to create leaf document for " + node.getSourcePath() + ":" + e
216                    + (e.getCause() != null ? e.getCause() : "");
217            fslog(errMsg, true);
218            log.error(errMsg);
219            // Process leaf node creation error and check if the global
220            // import task should continue
221            boolean shouldImportTaskContinue = getFactory().processLeafNodeCreationError(session, parent, node);
222            if (!shouldImportTaskContinue) {
223                throw new NuxeoException(e);
224            }
225        } finally {
226            split.stop();
227        }
228        BlobHolder bh = node.getBlobHolder();
229        if (leaf != null && bh != null) {
230            Blob blob = bh.getBlob();
231            if (blob != null) {
232                long fileSize = blob.getLength();
233                String fileName = blob.getFilename();
234                if (fileSize > 0) {
235                    long kbSize = fileSize / 1024;
236                    String parentPath = (parent == null) ? "null" : parent.getPathAsString();
237                    fslog("Created doc " + leaf.getName() + " at " + parentPath + " with file " + fileName
238                            + " of size " + kbSize + "KB", true);
239                }
240                uploadedKO += fileSize;
241            }
242
243            // send an event about the imported document
244            EventProducer eventProducer = Framework.getService(EventProducer.class);
245            EventContextImpl eventContext = new DocumentEventContext(session, session.getPrincipal(), leaf);
246            Event event = eventContext.newEvent(DOC_IMPORTED_EVENT);
247            eventProducer.fireEvent(event);
248
249            // save session if needed
250            commit();
251        }
252        return leaf;
253    }
254
255    protected boolean shouldImportDocument(SourceNode node) {
256        for (ImportingDocumentFilter importingDocumentFilter : importingDocumentFilters) {
257            if (!importingDocumentFilter.shouldImportDocument(node)) {
258                return false;
259            }
260        }
261        return true;
262    }
263
264    protected GenericThreadedImportTask createNewTask(DocumentModel parent, SourceNode node, ImporterLogger log,
265            Integer batchSize) {
266        GenericThreadedImportTask newTask = new GenericThreadedImportTask(repositoryName, node, parent,
267                skipContainerCreation, log, batchSize, factory, threadPolicy, null);
268        newTask.addListeners(listeners);
269        newTask.addImportingDocumentFilters(importingDocumentFilters);
270        return newTask;
271    }
272
273    protected GenericThreadedImportTask createNewTaskIfNeeded(DocumentModel parent, SourceNode node) {
274        if (isRootTask) {
275            isRootTask = false; // don't fork Root thread on first folder
276            return null;
277        }
278        int scheduledTasks = GenericMultiThreadedImporter.getExecutor().getQueue().size();
279        boolean createTask = getThreadPolicy().needToCreateThreadAfterNewFolderishNode(parent, node, uploadedFiles,
280                batchSize, scheduledTasks);
281
282        if (createTask) {
283            GenericThreadedImportTask newTask = createNewTask(parent, node, rsLogger, batchSize);
284            newTask.setBatchSize(getBatchSize());
285            newTask.setSkipContainerCreation(true);
286            newTask.setTransactionTimeout(transactionTimeout);
287            return newTask;
288        } else {
289            return null;
290        }
291    }
292
293    protected void recursiveCreateDocumentFromNode(DocumentModel parent, SourceNode node) throws IOException {
294
295        if (getFactory().isTargetDocumentModelFolderish(node)) {
296            DocumentModel folder;
297            Boolean newThread = false;
298            if (skipContainerCreation) {
299                folder = parent;
300                skipContainerCreation = false;
301                newThread = true;
302            } else {
303                folder = doCreateFolderishNode(parent, node);
304                if (folder == null) {
305                    return;
306                }
307            }
308
309            // get a new TaskImporter if available to start
310            // processing the sub-tree
311            GenericThreadedImportTask task = null;
312            if (!newThread) {
313                task = createNewTaskIfNeeded(folder, node);
314            }
315            if (task != null) {
316                // force comit before starting new thread
317                commit(true);
318                try {
319                    GenericMultiThreadedImporter.getExecutor().execute(task);
320                } catch (RejectedExecutionException e) {
321                    log.error("Import task rejected", e);
322                }
323
324            } else {
325                Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.node_get_children");
326                Split split = stopwatch.start();
327                List<SourceNode> nodes = node.getChildren();
328                split.stop();
329                if (nodes != null) {
330                    for (SourceNode child : nodes) {
331                        recursiveCreateDocumentFromNode(folder, child);
332                    }
333                }
334            }
335        } else {
336            doCreateLeafNode(parent, node);
337        }
338    }
339
340    public void setInputSource(SourceNode node) {
341        this.rootSource = node;
342    }
343
344    public void setTargetFolder(DocumentModel rootDoc) {
345        this.rootDoc = rootDoc;
346    }
347
348    // TODO isRunning is not yet handled correctly
349    public boolean isRunning() {
350        synchronized (this) {
351            return isRunning;
352        }
353    }
354
355    @Override
356    public synchronized void run() {
357        synchronized (this) {
358            if (isRunning) {
359                throw new IllegalStateException("Task already running");
360            }
361            isRunning = true;
362            // versions have no path, target document can be null
363            if (rootSource == null) {
364                isRunning = false;
365                throw new IllegalArgumentException("source node must be specified");
366            }
367        }
368        TransactionHelper.startTransaction(transactionTimeout);
369        boolean completedAbruptly = true;
370        try {
371            session = CoreInstance.getCoreSessionSystem(repositoryName);
372            log.info("Starting new import task");
373            Framework.doPrivileged(() -> {
374                if (rootDoc != null) {
375                    // reopen the root to be sure the session is valid
376                    rootDoc = session.getDocument(rootDoc.getRef());
377                }
378                try {
379                    recursiveCreateDocumentFromNode(rootDoc, rootSource);
380                } catch (IOException e) {
381                    throw new NuxeoException(e);
382                }
383                session.save();
384            });
385            GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles);
386            completedAbruptly = false;
387        } catch (Exception e) { // deals with interrupt below
388            log.error("Error during import", e);
389            ExceptionUtils.checkInterrupt(e);
390            notifyImportError();
391        } finally {
392            log.info("End of task");
393            if (completedAbruptly) {
394                TransactionHelper.setTransactionRollbackOnly();
395            }
396            TransactionHelper.commitOrRollbackTransaction();
397            synchronized (this) {
398                isRunning = false;
399            }
400        }
401    }
402
403    // This should be done with log4j but I did not find a way to configure it
404    // the way I wanted ...
405    protected void fslog(String msg, boolean debug) {
406        if (debug) {
407            rsLogger.debug(msg);
408        } else {
409            rsLogger.info(msg);
410        }
411    }
412
413    public int getBatchSize() {
414        return batchSize;
415    }
416
417    public void setBatchSize(int batchSize) {
418        this.batchSize = batchSize;
419    }
420
421    public void setSkipContainerCreation(Boolean skipContainerCreation) {
422        this.skipContainerCreation = skipContainerCreation;
423    }
424
425    public void setRootTask() {
426        isRootTask = true;
427        taskCounter = 0;
428        taskId = "T0";
429    }
430
431    protected ImporterThreadingPolicy getThreadPolicy() {
432        return threadPolicy;
433    }
434
435    protected ImporterDocumentModelFactory getFactory() {
436        return factory;
437    }
438
439    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
440        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
441    }
442
443    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
444        this.importingDocumentFilters.addAll(importingDocumentFilters);
445    }
446
447    public void addListeners(ImporterListener... listeners) {
448        addListeners(Arrays.asList(listeners));
449    }
450
451    public void addListeners(Collection<ImporterListener> listeners) {
452        this.listeners.addAll(listeners);
453    }
454
455    public void setTransactionTimeout(int transactionTimeout) {
456        this.transactionTimeout = transactionTimeout < 1 ? TX_TIMEOUT : transactionTimeout;
457    }
458
459    protected void notifyImportError() {
460        for (ImporterListener listener : listeners) {
461            listener.importError();
462        }
463    }
464
465    protected void setRootDoc(DocumentModel rootDoc) {
466        this.rootDoc = rootDoc;
467    }
468
469    protected void setRootSource(SourceNode rootSource) {
470        this.rootSource = rootSource;
471    }
472
473    protected void setFactory(ImporterDocumentModelFactory factory) {
474        this.factory = factory;
475    }
476
477    protected void setRsLogger(ImporterLogger rsLogger) {
478        this.rsLogger = rsLogger;
479    }
480
481    protected void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
482        this.threadPolicy = threadPolicy;
483    }
484
485    protected void setJobName(String jobName) {
486        this.jobName = jobName;
487    }
488
489}