001/*
002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 *
019 * $Id$
020 */
021
022package org.nuxeo.ecm.platform.importer.base;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.List;
029import java.util.concurrent.RejectedExecutionException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.javasimon.SimonManager;
034import org.javasimon.Split;
035import org.javasimon.Stopwatch;
036import org.nuxeo.common.utils.ExceptionUtils;
037import org.nuxeo.ecm.core.api.Blob;
038import org.nuxeo.ecm.core.api.CloseableCoreSession;
039import org.nuxeo.ecm.core.api.CoreInstance;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.api.blobholder.BlobHolder;
044import org.nuxeo.ecm.core.event.Event;
045import org.nuxeo.ecm.core.event.EventProducer;
046import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
047import org.nuxeo.ecm.core.event.impl.EventContextImpl;
048import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
049import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
050import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
051import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
052import org.nuxeo.ecm.platform.importer.source.SourceNode;
053import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
054import org.nuxeo.runtime.api.Framework;
055import org.nuxeo.runtime.transaction.TransactionHelper;
056
057/**
058 * Generic importer task
059 *
060 * @author Thierry Delprat
061 */
062public class GenericThreadedImportTask implements Runnable {
063
064    public static final String DOC_IMPORTED_EVENT = "documentImportedWithPlatformImporter";
065
066    private static final Log log = LogFactory.getLog(GenericThreadedImportTask.class);
067
068    protected static int taskCounter = 0;
069
070    protected boolean isRunning = false;
071
072    protected long uploadedFiles = 0;
073
074    protected long uploadedKO;
075
076    protected int batchSize;
077
078    protected CoreSession session;
079
080    protected DocumentModel rootDoc;
081
082    protected SourceNode rootSource;
083
084    protected Boolean skipContainerCreation = false;
085
086    protected Boolean isRootTask = false;
087
088    protected String taskId = null;
089
090    public static final int TX_TIMEOUT = 600;
091
092    protected int transactionTimeout = TX_TIMEOUT;
093
094    protected ImporterThreadingPolicy threadPolicy;
095
096    protected ImporterDocumentModelFactory factory;
097
098    protected String jobName;
099
100    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
101
102    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
103
104    protected String repositoryName;
105
106    private static synchronized int getNextTaskId() {
107        taskCounter += 1;
108        return taskCounter;
109    }
110
111    protected ImporterLogger rsLogger = null;
112
113    protected GenericThreadedImportTask(CoreSession session) {
114        this.session = session;
115        uploadedFiles = 0;
116        taskId = "T" + getNextTaskId();
117    }
118
119    protected GenericThreadedImportTask(CoreSession session, SourceNode rootSource, DocumentModel rootDoc,
120            boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize,
121            ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy) {
122        this.rsLogger = rsLogger;
123        this.session = session;
124        this.batchSize = batchSize;
125        uploadedFiles = 0;
126        taskId = "T" + getNextTaskId();
127        this.rootSource = rootSource;
128        this.rootDoc = rootDoc;
129        this.skipContainerCreation = skipContainerCreation;
130        this.factory = factory;
131        this.threadPolicy = threadPolicy;
132
133        // there are documents without path, like versions
134        if (rootSource == null) {
135            throw new IllegalArgumentException("source node must be specified");
136        }
137    }
138
139    public GenericThreadedImportTask(String repositoryName, SourceNode rootSource, DocumentModel rootDoc,
140            boolean skipContainerCreation, ImporterLogger rsLogger, int batchSize,
141            ImporterDocumentModelFactory factory, ImporterThreadingPolicy threadPolicy, String jobName) {
142        this(null, rootSource, rootDoc, skipContainerCreation, rsLogger, batchSize, factory, threadPolicy);
143        this.jobName = jobName;
144        this.repositoryName = repositoryName;
145    }
146
147    protected CoreSession getCoreSession() {
148        return session;
149    }
150
151    protected void commit() {
152        commit(false);
153    }
154
155    protected void commit(boolean force) {
156        uploadedFiles++;
157        if (uploadedFiles % 10 == 0) {
158            GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles);
159        }
160
161        if (uploadedFiles % batchSize == 0 || force) {
162            Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.session_save");
163            Split split = stopwatch.start();
164            fslog("Committing Core Session after " + uploadedFiles + " files", true);
165            session.save();
166            TransactionHelper.commitOrRollbackTransaction();
167            TransactionHelper.startTransaction(transactionTimeout);
168            split.stop();
169        }
170    }
171
172    protected DocumentModel doCreateFolderishNode(DocumentModel parent, SourceNode node) {
173        if (!shouldImportDocument(node)) {
174            return null;
175        }
176        Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_folder");
177        Split split = stopwatch.start();
178        DocumentModel folder = null;
179        try {
180            folder = getFactory().createFolderishNode(session, parent, node);
181        } catch (IOException e) {
182            String errorMsg = "Unable to create folderish document for " + node.getSourcePath() + ":" + e
183                    + (e.getCause() != null ? e.getCause() : "");
184            fslog(errorMsg, true);
185            log.error(errorMsg);
186            // Process folderish node creation error and check if the global
187            // import task should continue
188            boolean shouldImportTaskContinue = getFactory().processFolderishNodeCreationError(session, parent, node);
189            if (!shouldImportTaskContinue) {
190                throw new NuxeoException(e);
191            }
192        } finally {
193            split.stop();
194        }
195        if (folder != null) {
196            String parentPath = (parent == null) ? "null" : parent.getPathAsString();
197            fslog("Created Folder " + folder.getName() + " at " + parentPath, true);
198
199            // save session if needed
200            commit();
201        }
202        return folder;
203
204    }
205
206    protected DocumentModel doCreateLeafNode(DocumentModel parent, SourceNode node) throws IOException {
207        if (!shouldImportDocument(node)) {
208            return null;
209        }
210        Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.create_leaf");
211        Split split = stopwatch.start();
212        DocumentModel leaf = null;
213        try {
214            leaf = getFactory().createLeafNode(session, parent, node);
215        } catch (IOException e) {
216            String errMsg = "Unable to create leaf document for " + node.getSourcePath() + ":" + e
217                    + (e.getCause() != null ? e.getCause() : "");
218            fslog(errMsg, true);
219            log.error(errMsg);
220            // Process leaf node creation error and check if the global
221            // import task should continue
222            boolean shouldImportTaskContinue = getFactory().processLeafNodeCreationError(session, parent, node);
223            if (!shouldImportTaskContinue) {
224                throw new NuxeoException(e);
225            }
226        } finally {
227            split.stop();
228        }
229        BlobHolder bh = node.getBlobHolder();
230        if (leaf != null && bh != null) {
231            Blob blob = bh.getBlob();
232            if (blob != null) {
233                long fileSize = blob.getLength();
234                String fileName = blob.getFilename();
235                if (fileSize > 0) {
236                    long kbSize = fileSize / 1024;
237                    String parentPath = (parent == null) ? "null" : parent.getPathAsString();
238                    fslog("Created doc " + leaf.getName() + " at " + parentPath + " with file " + fileName
239                            + " of size " + kbSize + "KB", true);
240                }
241                uploadedKO += fileSize;
242            }
243
244            // send an event about the imported document
245            EventProducer eventProducer = Framework.getService(EventProducer.class);
246            EventContextImpl eventContext = new DocumentEventContext(session, session.getPrincipal(), leaf);
247            Event event = eventContext.newEvent(DOC_IMPORTED_EVENT);
248            eventProducer.fireEvent(event);
249
250            // save session if needed
251            commit();
252        }
253        return leaf;
254    }
255
256    protected boolean shouldImportDocument(SourceNode node) {
257        for (ImportingDocumentFilter importingDocumentFilter : importingDocumentFilters) {
258            if (!importingDocumentFilter.shouldImportDocument(node)) {
259                return false;
260            }
261        }
262        return true;
263    }
264
265    protected GenericThreadedImportTask createNewTask(DocumentModel parent, SourceNode node, ImporterLogger log,
266            Integer batchSize) {
267        GenericThreadedImportTask newTask = new GenericThreadedImportTask(repositoryName, node, parent,
268                skipContainerCreation, log, batchSize, factory, threadPolicy, null);
269        newTask.addListeners(listeners);
270        newTask.addImportingDocumentFilters(importingDocumentFilters);
271        return newTask;
272    }
273
274    protected GenericThreadedImportTask createNewTaskIfNeeded(DocumentModel parent, SourceNode node) {
275        if (isRootTask) {
276            isRootTask = false; // don't fork Root thread on first folder
277            return null;
278        }
279        int scheduledTasks = GenericMultiThreadedImporter.getExecutor().getQueue().size();
280        boolean createTask = getThreadPolicy().needToCreateThreadAfterNewFolderishNode(parent, node, uploadedFiles,
281                batchSize, scheduledTasks);
282
283        if (createTask) {
284            GenericThreadedImportTask newTask = createNewTask(parent, node, rsLogger, batchSize);
285            newTask.setBatchSize(getBatchSize());
286            newTask.setSkipContainerCreation(true);
287            newTask.setTransactionTimeout(transactionTimeout);
288            return newTask;
289        } else {
290            return null;
291        }
292    }
293
294    protected void recursiveCreateDocumentFromNode(DocumentModel parent, SourceNode node) throws IOException {
295
296        if (getFactory().isTargetDocumentModelFolderish(node)) {
297            DocumentModel folder;
298            Boolean newThread = false;
299            if (skipContainerCreation) {
300                folder = parent;
301                skipContainerCreation = false;
302                newThread = true;
303            } else {
304                folder = doCreateFolderishNode(parent, node);
305                if (folder == null) {
306                    return;
307                }
308            }
309
310            // get a new TaskImporter if available to start
311            // processing the sub-tree
312            GenericThreadedImportTask task = null;
313            if (!newThread) {
314                task = createNewTaskIfNeeded(folder, node);
315            }
316            if (task != null) {
317                // force comit before starting new thread
318                commit(true);
319                try {
320                    GenericMultiThreadedImporter.getExecutor().execute(task);
321                } catch (RejectedExecutionException e) {
322                    log.error("Import task rejected", e);
323                }
324
325            } else {
326                Stopwatch stopwatch = SimonManager.getStopwatch("org.nuxeo.ecm.platform.importer.node_get_children");
327                Split split = stopwatch.start();
328                List<SourceNode> nodes = node.getChildren();
329                split.stop();
330                if (nodes != null) {
331                    for (SourceNode child : nodes) {
332                        recursiveCreateDocumentFromNode(folder, child);
333                    }
334                }
335            }
336        } else {
337            doCreateLeafNode(parent, node);
338        }
339    }
340
341    public void setInputSource(SourceNode node) {
342        this.rootSource = node;
343    }
344
345    public void setTargetFolder(DocumentModel rootDoc) {
346        this.rootDoc = rootDoc;
347    }
348
349    // TODO isRunning is not yet handled correctly
350    public boolean isRunning() {
351        synchronized (this) {
352            return isRunning;
353        }
354    }
355
356    @Override
357    public synchronized void run() {
358        synchronized (this) {
359            if (isRunning) {
360                throw new IllegalStateException("Task already running");
361            }
362            isRunning = true;
363            // versions have no path, target document can be null
364            if (rootSource == null) {
365                isRunning = false;
366                throw new IllegalArgumentException("source node must be specified");
367            }
368        }
369        TransactionHelper.startTransaction(transactionTimeout);
370        boolean completedAbruptly = true;
371        try (CloseableCoreSession closeableCoreSession = CoreInstance.openCoreSessionSystem(repositoryName)) {
372            session = closeableCoreSession;
373            log.info("Starting new import task");
374            Framework.doPrivileged(() -> {
375                if (rootDoc != null) {
376                    // reopen the root to be sure the session is valid
377                    rootDoc = session.getDocument(rootDoc.getRef());
378                }
379                try {
380                    recursiveCreateDocumentFromNode(rootDoc, rootSource);
381                } catch (IOException e) {
382                    throw new NuxeoException(e);
383                }
384                session.save();
385            });
386            GenericMultiThreadedImporter.addCreatedDoc(taskId, uploadedFiles);
387            completedAbruptly = false;
388        } catch (Exception e) { // deals with interrupt below
389            log.error("Error during import", e);
390            ExceptionUtils.checkInterrupt(e);
391            notifyImportError();
392        } finally {
393            log.info("End of task");
394            session = null;
395            if (completedAbruptly) {
396                TransactionHelper.setTransactionRollbackOnly();
397            }
398            TransactionHelper.commitOrRollbackTransaction();
399            synchronized (this) {
400                isRunning = false;
401            }
402        }
403    }
404
405    // This should be done with log4j but I did not find a way to configure it
406    // the way I wanted ...
407    protected void fslog(String msg, boolean debug) {
408        if (debug) {
409            rsLogger.debug(msg);
410        } else {
411            rsLogger.info(msg);
412        }
413    }
414
415    public int getBatchSize() {
416        return batchSize;
417    }
418
419    public void setBatchSize(int batchSize) {
420        this.batchSize = batchSize;
421    }
422
423    public void setSkipContainerCreation(Boolean skipContainerCreation) {
424        this.skipContainerCreation = skipContainerCreation;
425    }
426
427    public void setRootTask() {
428        isRootTask = true;
429        taskCounter = 0;
430        taskId = "T0";
431    }
432
433    protected ImporterThreadingPolicy getThreadPolicy() {
434        return threadPolicy;
435    }
436
437    protected ImporterDocumentModelFactory getFactory() {
438        return factory;
439    }
440
441    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
442        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
443    }
444
445    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
446        this.importingDocumentFilters.addAll(importingDocumentFilters);
447    }
448
449    public void addListeners(ImporterListener... listeners) {
450        addListeners(Arrays.asList(listeners));
451    }
452
453    public void addListeners(Collection<ImporterListener> listeners) {
454        this.listeners.addAll(listeners);
455    }
456
457    public void setTransactionTimeout(int transactionTimeout) {
458        this.transactionTimeout = transactionTimeout < 1 ? TX_TIMEOUT : transactionTimeout;
459    }
460
461    protected void notifyImportError() {
462        for (ImporterListener listener : listeners) {
463            listener.importError();
464        }
465    }
466
467    protected void setRootDoc(DocumentModel rootDoc) {
468        this.rootDoc = rootDoc;
469    }
470
471    protected void setRootSource(SourceNode rootSource) {
472        this.rootSource = rootSource;
473    }
474
475    protected void setFactory(ImporterDocumentModelFactory factory) {
476        this.factory = factory;
477    }
478
479    protected void setRsLogger(ImporterLogger rsLogger) {
480        this.rsLogger = rsLogger;
481    }
482
483    protected void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
484        this.threadPolicy = threadPolicy;
485    }
486
487    protected void setJobName(String jobName) {
488        this.jobName = jobName;
489    }
490
491}