001/*
002 * (C) Copyright 2006-2008 Nuxeo SAS (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Nuxeo - initial API and implementation
016 *
017 * $Id$
018 */
019
020package org.nuxeo.ecm.platform.importer.base;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.javasimon.SimonManager;
036import org.javasimon.Stopwatch;
037import org.nuxeo.common.utils.ExceptionUtils;
038import org.nuxeo.ecm.core.api.CoreInstance;
039import org.nuxeo.ecm.core.api.CoreSession;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.DocumentNotFoundException;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.api.PathRef;
044import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory;
045import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
046import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
047import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
048import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
049import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener;
050import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
051import org.nuxeo.ecm.platform.importer.log.PerfLogger;
052import org.nuxeo.ecm.platform.importer.source.SourceNode;
053import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy;
054import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
055
056/**
057 * Generic importer
058 *
059 * @author Thierry Delprat
060 */
061public class GenericMultiThreadedImporter implements ImporterRunner {
062
063    protected static ThreadPoolExecutor importTP;
064
065    protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
066
067    protected ImporterThreadingPolicy threadPolicy;
068
069    protected ImporterDocumentModelFactory factory;
070
071    protected SourceNode importSource;
072
073    protected DocumentModel targetContainer;
074
075    protected Integer batchSize = 50;
076
077    protected Integer nbThreads = 5;
078
079    protected Integer transactionTimeout = 0;
080
081    protected ImporterLogger log;
082
083    protected CoreSession session;
084
085    protected String importWritePath;
086
087    protected Boolean skipRootContainerCreation = false;
088
089    protected String jobName;
090
091    protected boolean enablePerfLogging = true;
092
093    protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>();
094
095    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
096
097    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
098
099    protected GenericThreadedImportTask rootImportTask;
100
101    protected final static int DEFAULT_QUEUE_SIZE = 10000;
102
103    protected int queueSize = DEFAULT_QUEUE_SIZE;
104
105    protected String repositoryName;
106
107    public static ThreadPoolExecutor getExecutor() {
108        return importTP;
109    }
110
111    public static synchronized void addCreatedDoc(String taskId, long nbDocs) {
112        String tid = Thread.currentThread().getName();
113        nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs);
114    }
115
116    public static synchronized long getCreatedDocsCounter() {
117        long counter = 0;
118        for (String tid : nbCreatedDocsByThreads.keySet()) {
119            Long tCounter = nbCreatedDocsByThreads.get(tid);
120            if (tCounter != null) {
121                counter += tCounter;
122            }
123        }
124        return counter;
125    }
126
127    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
128            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) {
129        importSource = sourceNode;
130        this.importWritePath = importWritePath;
131        this.log = log;
132        if (batchSize != null) {
133            this.batchSize = batchSize;
134        }
135        if (nbThreads != null) {
136            this.nbThreads = nbThreads;
137        }
138        if (skipRootContainerCreation != null) {
139            this.skipRootContainerCreation = skipRootContainerCreation;
140        }
141    }
142
143    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
144            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) {
145        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE);
146    }
147
148    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
149            Integer nbThreads, ImporterLogger log) {
150        this(sourceNode, importWritePath, false, batchSize, nbThreads, log);
151    }
152
153    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
154            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) {
155
156        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log);
157        this.jobName = jobName;
158        if (jobName != null) {
159            listeners.add(new JobHistoryListener(jobName));
160        }
161    }
162
163    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
164            Integer nbThreads, String jobName, ImporterLogger log) {
165        this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log);
166    }
167
168    public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) {
169        this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation,
170                configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log);
171        repositoryName = configuration.repositoryName;
172    }
173
174    public void addFilter(ImporterFilter filter) {
175        log.debug(String.format(
176                "Filter with %s, was added on the importer with the hash code %s. The source node name is %s",
177                filter.toString(), this.hashCode(), importSource.getName()));
178        filters.add(filter);
179    }
180
181    public void addListeners(ImporterListener... listeners) {
182        addListeners(Arrays.asList(listeners));
183    }
184
185    public void addListeners(Collection<ImporterListener> listeners) {
186        this.listeners.addAll(listeners);
187    }
188
189    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
190        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
191    }
192
193    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
194        this.importingDocumentFilters.addAll(importingDocumentFilters);
195    }
196
197    @Override
198    public void run() {
199        Exception finalException = null;
200        try {
201            session = CoreInstance.openCoreSessionSystem(repositoryName);
202            for (ImporterFilter filter : filters) {
203                log.debug(String.format(
204                        "Running filter with %s, on the importer with the hash code %s. The source node name is %s",
205                        filter.toString(), this.hashCode(), importSource.getName()));
206                filter.handleBeforeImport();
207            }
208            if (filters.size() == 0) {
209                log.debug(String.format(
210                        "No filters are registered on the importer with hash code %s, while importing the source node with name %s",
211                        this.hashCode(), importSource.getName()));
212            }
213            doRun();
214        } catch (Exception e) { // deals with interrupt below
215            ExceptionUtils.checkInterrupt(e);
216            log.error("Task exec failed", e);
217            finalException = e;
218        } finally {
219            for (ImporterFilter filter : filters) {
220                filter.handleAfterImport(finalException);
221            }
222            if (session != null) {
223                session.close();
224                session = null;
225            }
226        }
227    }
228
229    public void setRootImportTask(GenericThreadedImportTask rootImportTask) {
230        this.rootImportTask = rootImportTask;
231    }
232
233    protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer,
234            boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) {
235        if (rootImportTask == null) {
236            setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer,
237                    skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName));
238        } else {
239            rootImportTask.setInputSource(importSource);
240            rootImportTask.setTargetFolder(targetContainer);
241            rootImportTask.setSkipContainerCreation(skipRootContainerCreation);
242            rootImportTask.setRsLogger(log);
243            rootImportTask.setFactory(getFactory());
244            rootImportTask.setThreadPolicy(getThreadPolicy());
245            rootImportTask.setJobName(jobName);
246            rootImportTask.setBatchSize(batchSize);
247        }
248        rootImportTask.addListeners(listeners);
249        rootImportTask.addImportingDocumentFilters(importingDocumentFilters);
250        rootImportTask.setTransactionTimeout(transactionTimeout);
251        return rootImportTask;
252    }
253
254    /**
255     * Creates non-daemon threads at normal priority.
256     */
257    public static class NamedThreadFactory implements ThreadFactory {
258
259        private final AtomicInteger threadNumber = new AtomicInteger();
260
261        private final ThreadGroup group;
262
263        private final String prefix;
264
265        public NamedThreadFactory(String prefix) {
266            SecurityManager sm = System.getSecurityManager();
267            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
268            this.prefix = prefix;
269        }
270
271        @Override
272        public Thread newThread(Runnable r) {
273            String name = prefix + threadNumber.incrementAndGet();
274            Thread thread = new Thread(group, r, name);
275            // do not set daemon
276            thread.setPriority(Thread.NORM_PRIORITY);
277            return thread;
278        }
279    }
280
281    protected void doRun() throws IOException {
282
283        targetContainer = getTargetContainer();
284
285        nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
286
287        importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS,
288                new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-"));
289
290        initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName);
291
292        rootImportTask.setRootTask();
293        long t0 = System.currentTimeMillis();
294
295        notifyBeforeImport();
296
297        importTP.execute(rootImportTask);
298        sleep(200);
299        int activeTasks = importTP.getActiveCount();
300        int oldActiveTasks = 0;
301        long lastLogProgressTime = System.currentTimeMillis();
302        long lastCreatedDocCounter = 0;
303
304        String[] headers = { "nbDocs", "average", "imediate" };
305        PerfLogger perfLogger = new PerfLogger(headers);
306        while (activeTasks > 0) {
307            sleep(500);
308            activeTasks = importTP.getActiveCount();
309            boolean logProgress = false;
310            if (oldActiveTasks != activeTasks) {
311                oldActiveTasks = activeTasks;
312                log.debug("currently " + activeTasks + " active import Threads");
313                logProgress = true;
314
315            }
316            long ti = System.currentTimeMillis();
317            if (ti - lastLogProgressTime > 5000) {
318                logProgress = true;
319            }
320            if (logProgress) {
321                long inbCreatedDocs = getCreatedDocsCounter();
322                long deltaT = ti - lastLogProgressTime;
323                double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0));
324                double imediateSpeed = averageSpeed;
325                if (deltaT > 0) {
326                    imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT));
327                }
328                log.info(inbCreatedDocs + " docs created");
329                log.info("average speed = " + averageSpeed + " docs/s");
330                log.info("immediate speed = " + imediateSpeed + " docs/s");
331
332                if (enablePerfLogging) {
333                    Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed };
334                    perfLogger.log(perfData);
335                }
336
337                lastLogProgressTime = ti;
338                lastCreatedDocCounter = inbCreatedDocs;
339            }
340        }
341        stopImportProcrocess();
342        log.info("All Threads terminated");
343        perfLogger.release();
344        notifyAfterImport();
345
346        long t1 = System.currentTimeMillis();
347        long nbCreatedDocs = getCreatedDocsCounter();
348        log.info(nbCreatedDocs + " docs created");
349        log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s");
350        for (String k : nbCreatedDocsByThreads.keySet()) {
351            log.info(k + " --> " + nbCreatedDocsByThreads.get(k));
352        }
353        Stopwatch stopwatch;
354        for (String name : SimonManager.simonNames()) {
355            if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) {
356                continue;
357            }
358            stopwatch = SimonManager.getStopwatch(name);
359            if (stopwatch.getCounter() > 0) {
360                log.info(stopwatch.toString());
361            }
362        }
363
364    }
365
366    protected static void sleep(int millis) {
367        try {
368            Thread.sleep(millis);
369        } catch (InterruptedException e) {
370            // restore interrupt status
371            Thread.currentThread().interrupt();
372            throw new NuxeoException(e);
373        }
374    }
375
376    protected DocumentModel getTargetContainer() {
377        if (targetContainer == null) {
378            targetContainer = createTargetContainer();
379        }
380        return targetContainer;
381    }
382
383    /**
384     * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be
385     * overridden in subclasses.
386     *
387     * @return
388     */
389    protected DocumentModel createTargetContainer() {
390        try {
391            return session.getDocument(new PathRef(importWritePath));
392        } catch (DocumentNotFoundException e) {
393            log.error(e.getMessage());
394            throw e;
395        }
396    }
397
398    public ImporterThreadingPolicy getThreadPolicy() {
399        if (threadPolicy == null) {
400            threadPolicy = new DefaultMultiThreadingPolicy();
401        }
402        return threadPolicy;
403    }
404
405    public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
406        this.threadPolicy = threadPolicy;
407    }
408
409    public ImporterDocumentModelFactory getFactory() {
410        if (factory == null) {
411            factory = new DefaultDocumentModelFactory();
412        }
413        return factory;
414    }
415
416    public void setFactory(ImporterDocumentModelFactory factory) {
417        this.factory = factory;
418    }
419
420    /**
421     * @since 5.9.4
422     */
423    public void setTransactionTimeout(int transactionTimeout) {
424        this.transactionTimeout = transactionTimeout;
425    }
426
427    public void setEnablePerfLogging(boolean enablePerfLogging) {
428        this.enablePerfLogging = enablePerfLogging;
429    }
430
431    public void stopImportProcrocess() {
432        if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) {
433            importTP.shutdownNow();
434        }
435    }
436
437    protected void notifyBeforeImport() {
438        for (ImporterListener listener : listeners) {
439            listener.beforeImport();
440        }
441    }
442
443    protected void notifyAfterImport() {
444        for (ImporterListener listener : listeners) {
445            listener.afterImport();
446        }
447    }
448
449    /**
450     * @since 7.1
451     */
452    public String getRepositoryName() {
453        return repositoryName;
454    }
455
456    /**
457     * @since 7.1
458     */
459    public void setRepositoryName(String repositoryName) {
460        this.repositoryName = repositoryName;
461    }
462
463}