001/*
002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 *
019 * $Id$
020 */
021
022package org.nuxeo.ecm.platform.importer.base;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.LinkedBlockingQueue;
032import java.util.concurrent.ThreadFactory;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036
037import org.javasimon.SimonManager;
038import org.javasimon.Stopwatch;
039import org.nuxeo.common.utils.ExceptionUtils;
040import org.nuxeo.ecm.core.api.CoreInstance;
041import org.nuxeo.ecm.core.api.CoreSession;
042import org.nuxeo.ecm.core.api.DocumentModel;
043import org.nuxeo.ecm.core.api.DocumentNotFoundException;
044import org.nuxeo.ecm.core.api.NuxeoException;
045import org.nuxeo.ecm.core.api.PathRef;
046import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory;
047import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
048import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
049import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
050import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
051import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener;
052import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
053import org.nuxeo.ecm.platform.importer.log.PerfLogger;
054import org.nuxeo.ecm.platform.importer.source.SourceNode;
055import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy;
056import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
057
058/**
059 * Generic importer
060 *
061 * @author Thierry Delprat
062 */
063public class GenericMultiThreadedImporter implements ImporterRunner {
064
065    protected static ThreadPoolExecutor importTP;
066
067    protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
068
069    protected ImporterThreadingPolicy threadPolicy;
070
071    protected ImporterDocumentModelFactory factory;
072
073    protected SourceNode importSource;
074
075    protected DocumentModel targetContainer;
076
077    protected Integer batchSize = 50;
078
079    protected Integer nbThreads = 5;
080
081    protected Integer transactionTimeout = 0;
082
083    protected ImporterLogger log;
084
085    protected CoreSession session;
086
087    protected String importWritePath;
088
089    protected Boolean skipRootContainerCreation = false;
090
091    protected String jobName;
092
093    protected boolean enablePerfLogging = true;
094
095    protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>();
096
097    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
098
099    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
100
101    protected GenericThreadedImportTask rootImportTask;
102
103    protected final static int DEFAULT_QUEUE_SIZE = 10000;
104
105    protected int queueSize = DEFAULT_QUEUE_SIZE;
106
107    protected String repositoryName;
108
109    public static ThreadPoolExecutor getExecutor() {
110        return importTP;
111    }
112
113    public static synchronized void addCreatedDoc(String taskId, long nbDocs) {
114        String tid = Thread.currentThread().getName();
115        nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs);
116    }
117
118    public static synchronized long getCreatedDocsCounter() {
119        long counter = 0;
120        for (String tid : nbCreatedDocsByThreads.keySet()) {
121            Long tCounter = nbCreatedDocsByThreads.get(tid);
122            if (tCounter != null) {
123                counter += tCounter;
124            }
125        }
126        return counter;
127    }
128
129    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
130            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) {
131        importSource = sourceNode;
132        this.importWritePath = importWritePath;
133        this.log = log;
134        if (batchSize != null) {
135            this.batchSize = batchSize;
136        }
137        if (nbThreads != null) {
138            this.nbThreads = nbThreads;
139        }
140        if (skipRootContainerCreation != null) {
141            this.skipRootContainerCreation = skipRootContainerCreation;
142        }
143    }
144
145    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
146            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) {
147        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE);
148    }
149
150    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
151            Integer nbThreads, ImporterLogger log) {
152        this(sourceNode, importWritePath, false, batchSize, nbThreads, log);
153    }
154
155    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
156            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) {
157
158        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log);
159        this.jobName = jobName;
160        if (jobName != null) {
161            listeners.add(new JobHistoryListener(jobName));
162        }
163    }
164
165    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
166            Integer nbThreads, String jobName, ImporterLogger log) {
167        this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log);
168    }
169
170    public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) {
171        this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation,
172                configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log);
173        repositoryName = configuration.repositoryName;
174    }
175
176    public void addFilter(ImporterFilter filter) {
177        log.debug(String.format(
178                "Filter with %s, was added on the importer with the hash code %s. The source node name is %s",
179                filter.toString(), this.hashCode(), importSource.getName()));
180        filters.add(filter);
181    }
182
183    public void addListeners(ImporterListener... listeners) {
184        addListeners(Arrays.asList(listeners));
185    }
186
187    public void addListeners(Collection<ImporterListener> listeners) {
188        this.listeners.addAll(listeners);
189    }
190
191    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
192        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
193    }
194
195    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
196        this.importingDocumentFilters.addAll(importingDocumentFilters);
197    }
198
199    @Override
200    public void run() {
201        Exception finalException = null;
202        try {
203            session = CoreInstance.openCoreSessionSystem(repositoryName);
204            for (ImporterFilter filter : filters) {
205                log.debug(String.format(
206                        "Running filter with %s, on the importer with the hash code %s. The source node name is %s",
207                        filter.toString(), this.hashCode(), importSource.getName()));
208                filter.handleBeforeImport();
209            }
210            if (filters.size() == 0) {
211                log.debug(String.format(
212                        "No filters are registered on the importer with hash code %s, while importing the source node with name %s",
213                        this.hashCode(), importSource.getName()));
214            }
215            doRun();
216        } catch (Exception e) { // deals with interrupt below
217            ExceptionUtils.checkInterrupt(e);
218            log.error("Task exec failed", e);
219            finalException = e;
220        } finally {
221            for (ImporterFilter filter : filters) {
222                filter.handleAfterImport(finalException);
223            }
224            if (session != null) {
225                session.close();
226                session = null;
227            }
228        }
229    }
230
231    public void setRootImportTask(GenericThreadedImportTask rootImportTask) {
232        this.rootImportTask = rootImportTask;
233    }
234
235    protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer,
236            boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) {
237        if (rootImportTask == null) {
238            setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer,
239                    skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName));
240        } else {
241            rootImportTask.setInputSource(importSource);
242            rootImportTask.setTargetFolder(targetContainer);
243            rootImportTask.setSkipContainerCreation(skipRootContainerCreation);
244            rootImportTask.setRsLogger(log);
245            rootImportTask.setFactory(getFactory());
246            rootImportTask.setThreadPolicy(getThreadPolicy());
247            rootImportTask.setJobName(jobName);
248            rootImportTask.setBatchSize(batchSize);
249        }
250        rootImportTask.addListeners(listeners);
251        rootImportTask.addImportingDocumentFilters(importingDocumentFilters);
252        rootImportTask.setTransactionTimeout(transactionTimeout);
253        return rootImportTask;
254    }
255
256    /**
257     * Creates non-daemon threads at normal priority.
258     */
259    public static class NamedThreadFactory implements ThreadFactory {
260
261        private final AtomicInteger threadNumber = new AtomicInteger();
262
263        private final ThreadGroup group;
264
265        private final String prefix;
266
267        public NamedThreadFactory(String prefix) {
268            SecurityManager sm = System.getSecurityManager();
269            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
270            this.prefix = prefix;
271        }
272
273        @Override
274        public Thread newThread(Runnable r) {
275            String name = prefix + threadNumber.incrementAndGet();
276            Thread thread = new Thread(group, r, name);
277            // do not set daemon
278            thread.setPriority(Thread.NORM_PRIORITY);
279            return thread;
280        }
281    }
282
283    protected void doRun() throws IOException {
284
285        targetContainer = getTargetContainer();
286
287        nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
288
289        importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS,
290                new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-"));
291
292        initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName);
293
294        rootImportTask.setRootTask();
295        long t0 = System.currentTimeMillis();
296
297        notifyBeforeImport();
298
299        importTP.execute(rootImportTask);
300        sleep(200);
301        int activeTasks = importTP.getActiveCount();
302        int oldActiveTasks = 0;
303        long lastLogProgressTime = System.currentTimeMillis();
304        long lastCreatedDocCounter = 0;
305
306        String[] headers = { "nbDocs", "average", "imediate" };
307        PerfLogger perfLogger = new PerfLogger(headers);
308        while (activeTasks > 0) {
309            sleep(500);
310            activeTasks = importTP.getActiveCount();
311            boolean logProgress = false;
312            if (oldActiveTasks != activeTasks) {
313                oldActiveTasks = activeTasks;
314                log.debug("currently " + activeTasks + " active import Threads");
315                logProgress = true;
316
317            }
318            long ti = System.currentTimeMillis();
319            if (ti - lastLogProgressTime > 5000) {
320                logProgress = true;
321            }
322            if (logProgress) {
323                long inbCreatedDocs = getCreatedDocsCounter();
324                long deltaT = ti - lastLogProgressTime;
325                double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0));
326                double imediateSpeed = averageSpeed;
327                if (deltaT > 0) {
328                    imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT));
329                }
330                log.info(inbCreatedDocs + " docs created");
331                log.info("average speed = " + averageSpeed + " docs/s");
332                log.info("immediate speed = " + imediateSpeed + " docs/s");
333
334                if (enablePerfLogging) {
335                    Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed };
336                    perfLogger.log(perfData);
337                }
338
339                lastLogProgressTime = ti;
340                lastCreatedDocCounter = inbCreatedDocs;
341            }
342        }
343        stopImportProcrocess();
344        log.info("All Threads terminated");
345        perfLogger.release();
346        notifyAfterImport();
347
348        long t1 = System.currentTimeMillis();
349        long nbCreatedDocs = getCreatedDocsCounter();
350        log.info(nbCreatedDocs + " docs created");
351        log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s");
352        for (String k : nbCreatedDocsByThreads.keySet()) {
353            log.info(k + " --> " + nbCreatedDocsByThreads.get(k));
354        }
355        Stopwatch stopwatch;
356        for (String name : SimonManager.simonNames()) {
357            if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) {
358                continue;
359            }
360            stopwatch = SimonManager.getStopwatch(name);
361            if (stopwatch.getCounter() > 0) {
362                log.info(stopwatch.toString());
363            }
364        }
365
366    }
367
368    protected static void sleep(int millis) {
369        try {
370            Thread.sleep(millis);
371        } catch (InterruptedException e) {
372            // restore interrupt status
373            Thread.currentThread().interrupt();
374            throw new NuxeoException(e);
375        }
376    }
377
378    protected DocumentModel getTargetContainer() {
379        if (targetContainer == null) {
380            targetContainer = createTargetContainer();
381        }
382        return targetContainer;
383    }
384
385    /**
386     * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be
387     * overridden in subclasses.
388     *
389     * @return
390     */
391    protected DocumentModel createTargetContainer() {
392        try {
393            return session.getDocument(new PathRef(importWritePath));
394        } catch (DocumentNotFoundException e) {
395            log.error(e.getMessage());
396            throw e;
397        }
398    }
399
400    public ImporterThreadingPolicy getThreadPolicy() {
401        if (threadPolicy == null) {
402            threadPolicy = new DefaultMultiThreadingPolicy();
403        }
404        return threadPolicy;
405    }
406
407    public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
408        this.threadPolicy = threadPolicy;
409    }
410
411    public ImporterDocumentModelFactory getFactory() {
412        if (factory == null) {
413            factory = new DefaultDocumentModelFactory();
414        }
415        return factory;
416    }
417
418    public void setFactory(ImporterDocumentModelFactory factory) {
419        this.factory = factory;
420    }
421
422    /**
423     * @since 5.9.4
424     */
425    public void setTransactionTimeout(int transactionTimeout) {
426        this.transactionTimeout = transactionTimeout;
427    }
428
429    public void setEnablePerfLogging(boolean enablePerfLogging) {
430        this.enablePerfLogging = enablePerfLogging;
431    }
432
433    public void stopImportProcrocess() {
434        if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) {
435            importTP.shutdownNow();
436        }
437    }
438
439    protected void notifyBeforeImport() {
440        for (ImporterListener listener : listeners) {
441            listener.beforeImport();
442        }
443    }
444
445    protected void notifyAfterImport() {
446        for (ImporterListener listener : listeners) {
447            listener.afterImport();
448        }
449    }
450
451    /**
452     * @since 7.1
453     */
454    public String getRepositoryName() {
455        return repositoryName;
456    }
457
458    /**
459     * @since 7.1
460     */
461    public void setRepositoryName(String repositoryName) {
462        this.repositoryName = repositoryName;
463    }
464
465}