001/*
002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 *
019 * $Id$
020 */
021
022package org.nuxeo.ecm.platform.importer.base;
023
024import org.javasimon.SimonManager;
025import org.javasimon.Stopwatch;
026import org.nuxeo.common.utils.ExceptionUtils;
027import org.nuxeo.ecm.core.api.CoreInstance;
028import org.nuxeo.ecm.core.api.CoreSession;
029import org.nuxeo.ecm.core.api.DocumentModel;
030import org.nuxeo.ecm.core.api.DocumentNotFoundException;
031import org.nuxeo.ecm.core.api.NuxeoException;
032import org.nuxeo.ecm.core.api.PathRef;
033import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory;
034import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
035import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
036import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
037import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
038import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener;
039import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
040import org.nuxeo.ecm.platform.importer.log.PerfLogger;
041import org.nuxeo.ecm.platform.importer.source.SourceNode;
042import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy;
043import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
044import org.nuxeo.runtime.transaction.TransactionHelper;
045
046import java.io.IOException;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Collection;
050import java.util.List;
051import java.util.Map;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.LinkedBlockingQueue;
054import java.util.concurrent.ThreadFactory;
055import java.util.concurrent.ThreadPoolExecutor;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicInteger;
058
059/**
060 * Generic importer
061 *
062 * @author Thierry Delprat
063 */
064public class GenericMultiThreadedImporter implements ImporterRunner {
065
066    protected static ThreadPoolExecutor importTP;
067
068    protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
069
070    protected ImporterThreadingPolicy threadPolicy;
071
072    protected ImporterDocumentModelFactory factory;
073
074    protected SourceNode importSource;
075
076    protected DocumentModel targetContainer;
077
078    protected Integer batchSize = 50;
079
080    protected Integer nbThreads = 5;
081
082    protected Integer transactionTimeout = 0;
083
084    protected ImporterLogger log;
085
086    protected CoreSession session;
087
088    protected String importWritePath;
089
090    protected Boolean skipRootContainerCreation = false;
091
092    protected String jobName;
093
094    protected boolean enablePerfLogging = true;
095
096    protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>();
097
098    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
099
100    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
101
102    protected GenericThreadedImportTask rootImportTask;
103
104    protected final static int DEFAULT_QUEUE_SIZE = 10000;
105
106    protected int queueSize = DEFAULT_QUEUE_SIZE;
107
108    protected String repositoryName;
109
110    protected static final String[] PERF_HEADERS = { "nbDocs", "average", "imediate" };
111
112    public static ThreadPoolExecutor getExecutor() {
113        return importTP;
114    }
115
116    public static synchronized void addCreatedDoc(String taskId, long nbDocs) {
117        String tid = Thread.currentThread().getName();
118        nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs);
119    }
120
121    public static synchronized long getCreatedDocsCounter() {
122        long counter = 0;
123        for (String tid : nbCreatedDocsByThreads.keySet()) {
124            Long tCounter = nbCreatedDocsByThreads.get(tid);
125            if (tCounter != null) {
126                counter += tCounter;
127            }
128        }
129        return counter;
130    }
131
132    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
133            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) {
134        importSource = sourceNode;
135        this.importWritePath = importWritePath;
136        this.log = log;
137        if (batchSize != null) {
138            this.batchSize = batchSize;
139        }
140        if (nbThreads != null) {
141            this.nbThreads = nbThreads;
142        }
143        if (skipRootContainerCreation != null) {
144            this.skipRootContainerCreation = skipRootContainerCreation;
145        }
146    }
147
148    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
149            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) {
150        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE);
151    }
152
153    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
154            Integer nbThreads, ImporterLogger log) {
155        this(sourceNode, importWritePath, false, batchSize, nbThreads, log);
156    }
157
158    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
159            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) {
160
161        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log);
162        this.jobName = jobName;
163        if (jobName != null) {
164            listeners.add(new JobHistoryListener(jobName));
165        }
166    }
167
168    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
169            Integer nbThreads, String jobName, ImporterLogger log) {
170        this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log);
171    }
172
173    public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) {
174        this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation,
175                configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log);
176        repositoryName = configuration.repositoryName;
177    }
178
179    public void addFilter(ImporterFilter filter) {
180        log.debug(String.format(
181                "Filter with %s, was added on the importer with the hash code %s. The source node name is %s",
182                filter.toString(), this.hashCode(), importSource.getName()));
183        filters.add(filter);
184    }
185
186    public void addListeners(ImporterListener... listeners) {
187        addListeners(Arrays.asList(listeners));
188    }
189
190    public void addListeners(Collection<ImporterListener> listeners) {
191        this.listeners.addAll(listeners);
192    }
193
194    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
195        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
196    }
197
198    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
199        this.importingDocumentFilters.addAll(importingDocumentFilters);
200    }
201
202    @Override
203    public void run() {
204        Exception finalException = null;
205        try {
206            if (!TransactionHelper.isTransactionActive()) {
207                TransactionHelper.startTransaction();
208            }
209            session = CoreInstance.openCoreSessionSystem(repositoryName);
210            for (ImporterFilter filter : filters) {
211                log.debug(String.format(
212                        "Running filter with %s, on the importer with the hash code %s. The source node name is %s",
213                        filter.toString(), this.hashCode(), importSource.getName()));
214                filter.handleBeforeImport();
215            }
216            if (filters.size() == 0) {
217                log.debug(String.format(
218                        "No filters are registered on the importer with hash code %s, while importing the source node with name %s",
219                        this.hashCode(), importSource.getName()));
220            }
221            doRun();
222        } catch (Exception e) { // deals with interrupt below
223            ExceptionUtils.checkInterrupt(e);
224            log.error("Task exec failed", e);
225            finalException = e;
226        } finally {
227            for (ImporterFilter filter : filters) {
228                filter.handleAfterImport(finalException);
229            }
230            if (session != null) {
231                session.close();
232                session = null;
233            }
234        }
235    }
236
237    public void setRootImportTask(GenericThreadedImportTask rootImportTask) {
238        this.rootImportTask = rootImportTask;
239    }
240
241    protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer,
242            boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) {
243        if (rootImportTask == null) {
244            setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer,
245                    skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName));
246        } else {
247            rootImportTask.setInputSource(importSource);
248            rootImportTask.setTargetFolder(targetContainer);
249            rootImportTask.setSkipContainerCreation(skipRootContainerCreation);
250            rootImportTask.setRsLogger(log);
251            rootImportTask.setFactory(getFactory());
252            rootImportTask.setThreadPolicy(getThreadPolicy());
253            rootImportTask.setJobName(jobName);
254            rootImportTask.setBatchSize(batchSize);
255        }
256        rootImportTask.addListeners(listeners);
257        rootImportTask.addImportingDocumentFilters(importingDocumentFilters);
258        rootImportTask.setTransactionTimeout(transactionTimeout);
259        return rootImportTask;
260    }
261
262    /**
263     * Creates non-daemon threads at normal priority.
264     */
265    public static class NamedThreadFactory implements ThreadFactory {
266
267        private final AtomicInteger threadNumber = new AtomicInteger();
268
269        private final ThreadGroup group;
270
271        private final String prefix;
272
273        public NamedThreadFactory(String prefix) {
274            SecurityManager sm = System.getSecurityManager();
275            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
276            this.prefix = prefix;
277        }
278
279        @Override
280        public Thread newThread(Runnable r) {
281            String name = prefix + threadNumber.incrementAndGet();
282            Thread thread = new Thread(group, r, name);
283            // do not set daemon
284            thread.setPriority(Thread.NORM_PRIORITY);
285            return thread;
286        }
287    }
288
289    protected void doRun() throws IOException {
290
291        targetContainer = getTargetContainer();
292
293        nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
294
295        importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS,
296                new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-"));
297
298        initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName);
299
300        rootImportTask.setRootTask();
301        long t0 = System.currentTimeMillis();
302
303        notifyBeforeImport();
304
305        importTP.execute(rootImportTask);
306        sleep(200);
307        int activeTasks = importTP.getActiveCount();
308        int oldActiveTasks = 0;
309        long lastLogProgressTime = System.currentTimeMillis();
310        long lastCreatedDocCounter = 0;
311
312        PerfLogger perfLogger = enablePerfLogging ? new PerfLogger(PERF_HEADERS) : null;
313        while (activeTasks > 0) {
314            sleep(500);
315            activeTasks = importTP.getActiveCount();
316            boolean logProgress = false;
317            if (oldActiveTasks != activeTasks) {
318                oldActiveTasks = activeTasks;
319                log.debug("currently " + activeTasks + " active import Threads");
320                logProgress = true;
321
322            }
323            long ti = System.currentTimeMillis();
324            if (ti - lastLogProgressTime > 5000) {
325                logProgress = true;
326            }
327            if (logProgress) {
328                long inbCreatedDocs = getCreatedDocsCounter();
329                long deltaT = ti - lastLogProgressTime;
330                double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0));
331                double imediateSpeed = averageSpeed;
332                if (deltaT > 0) {
333                    imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT));
334                }
335                log.info(inbCreatedDocs + " docs created");
336                log.info("average speed = " + averageSpeed + " docs/s");
337                log.info("immediate speed = " + imediateSpeed + " docs/s");
338
339                if (enablePerfLogging) {
340                    Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed };
341                    perfLogger.log(perfData);
342                }
343
344                lastLogProgressTime = ti;
345                lastCreatedDocCounter = inbCreatedDocs;
346            }
347        }
348        stopImportProcrocess();
349        log.info("All Threads terminated");
350        if (enablePerfLogging) {
351            perfLogger.release();
352        }
353        notifyAfterImport();
354
355        long t1 = System.currentTimeMillis();
356        long nbCreatedDocs = getCreatedDocsCounter();
357        log.info(nbCreatedDocs + " docs created");
358        log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s");
359        for (String k : nbCreatedDocsByThreads.keySet()) {
360            log.info(k + " --> " + nbCreatedDocsByThreads.get(k));
361        }
362        Stopwatch stopwatch;
363        for (String name : SimonManager.simonNames()) {
364            if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) {
365                continue;
366            }
367            stopwatch = SimonManager.getStopwatch(name);
368            if (stopwatch.getCounter() > 0) {
369                log.info(stopwatch.toString());
370            }
371        }
372
373    }
374
375    protected static void sleep(int millis) {
376        try {
377            Thread.sleep(millis);
378        } catch (InterruptedException e) {
379            // restore interrupt status
380            Thread.currentThread().interrupt();
381            throw new NuxeoException(e);
382        }
383    }
384
385    protected DocumentModel getTargetContainer() {
386        if (targetContainer == null) {
387            targetContainer = createTargetContainer();
388        }
389        return targetContainer;
390    }
391
392    /**
393     * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be
394     * overridden in subclasses.
395     *
396     * @return
397     */
398    protected DocumentModel createTargetContainer() {
399        try {
400            return session.getDocument(new PathRef(importWritePath));
401        } catch (DocumentNotFoundException e) {
402            log.error(e.getMessage());
403            throw e;
404        }
405    }
406
407    public ImporterThreadingPolicy getThreadPolicy() {
408        if (threadPolicy == null) {
409            threadPolicy = new DefaultMultiThreadingPolicy();
410        }
411        return threadPolicy;
412    }
413
414    public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
415        this.threadPolicy = threadPolicy;
416    }
417
418    public ImporterDocumentModelFactory getFactory() {
419        if (factory == null) {
420            factory = new DefaultDocumentModelFactory();
421        }
422        return factory;
423    }
424
425    public void setFactory(ImporterDocumentModelFactory factory) {
426        this.factory = factory;
427    }
428
429    /**
430     * @since 5.9.4
431     */
432    public void setTransactionTimeout(int transactionTimeout) {
433        this.transactionTimeout = transactionTimeout;
434    }
435
436    public void setEnablePerfLogging(boolean enablePerfLogging) {
437        this.enablePerfLogging = enablePerfLogging;
438    }
439
440    public void stopImportProcrocess() {
441        if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) {
442            importTP.shutdownNow();
443        }
444    }
445
446    protected void notifyBeforeImport() {
447        for (ImporterListener listener : listeners) {
448            listener.beforeImport();
449        }
450    }
451
452    protected void notifyAfterImport() {
453        for (ImporterListener listener : listeners) {
454            listener.afterImport();
455        }
456    }
457
458    /**
459     * @since 7.1
460     */
461    public String getRepositoryName() {
462        return repositoryName;
463    }
464
465    /**
466     * @since 7.1
467     */
468    public void setRepositoryName(String repositoryName) {
469        this.repositoryName = repositoryName;
470    }
471
472}