001/*
002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 *
019 * $Id$
020 */
021
022package org.nuxeo.ecm.platform.importer.base;
023
024import org.javasimon.SimonManager;
025import org.javasimon.Stopwatch;
026import org.nuxeo.common.utils.ExceptionUtils;
027import org.nuxeo.ecm.core.api.CoreInstance;
028import org.nuxeo.ecm.core.api.CoreSession;
029import org.nuxeo.ecm.core.api.DocumentModel;
030import org.nuxeo.ecm.core.api.DocumentNotFoundException;
031import org.nuxeo.ecm.core.api.NuxeoException;
032import org.nuxeo.ecm.core.api.PathRef;
033import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory;
034import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
035import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
036import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
037import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
038import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener;
039import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
040import org.nuxeo.ecm.platform.importer.log.PerfLogger;
041import org.nuxeo.ecm.platform.importer.source.SourceNode;
042import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy;
043import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
044import org.nuxeo.runtime.transaction.TransactionHelper;
045
046import java.io.IOException;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Collection;
050import java.util.List;
051import java.util.Map;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.LinkedBlockingQueue;
054import java.util.concurrent.ThreadFactory;
055import java.util.concurrent.ThreadPoolExecutor;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicInteger;
058
059/**
060 * Generic importer
061 *
062 * @author Thierry Delprat
063 */
064public class GenericMultiThreadedImporter implements ImporterRunner {
065
066    protected static ThreadPoolExecutor importTP;
067
068    protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
069
070    protected ImporterThreadingPolicy threadPolicy;
071
072    protected ImporterDocumentModelFactory factory;
073
074    protected SourceNode importSource;
075
076    protected DocumentModel targetContainer;
077
078    protected Integer batchSize = 50;
079
080    protected Integer nbThreads = 5;
081
082    protected Integer transactionTimeout = 0;
083
084    protected ImporterLogger log;
085
086    protected CoreSession session;
087
088    protected String importWritePath;
089
090    protected Boolean skipRootContainerCreation = false;
091
092    protected String jobName;
093
094    protected boolean enablePerfLogging = true;
095
096    protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>();
097
098    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
099
100    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
101
102    protected GenericThreadedImportTask rootImportTask;
103
104    protected final static int DEFAULT_QUEUE_SIZE = 10000;
105
106    protected int queueSize = DEFAULT_QUEUE_SIZE;
107
108    protected String repositoryName;
109
110    public static ThreadPoolExecutor getExecutor() {
111        return importTP;
112    }
113
114    public static synchronized void addCreatedDoc(String taskId, long nbDocs) {
115        String tid = Thread.currentThread().getName();
116        nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs);
117    }
118
119    public static synchronized long getCreatedDocsCounter() {
120        long counter = 0;
121        for (String tid : nbCreatedDocsByThreads.keySet()) {
122            Long tCounter = nbCreatedDocsByThreads.get(tid);
123            if (tCounter != null) {
124                counter += tCounter;
125            }
126        }
127        return counter;
128    }
129
130    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
131            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) {
132        importSource = sourceNode;
133        this.importWritePath = importWritePath;
134        this.log = log;
135        if (batchSize != null) {
136            this.batchSize = batchSize;
137        }
138        if (nbThreads != null) {
139            this.nbThreads = nbThreads;
140        }
141        if (skipRootContainerCreation != null) {
142            this.skipRootContainerCreation = skipRootContainerCreation;
143        }
144    }
145
146    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
147            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) {
148        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE);
149    }
150
151    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
152            Integer nbThreads, ImporterLogger log) {
153        this(sourceNode, importWritePath, false, batchSize, nbThreads, log);
154    }
155
156    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
157            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) {
158
159        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log);
160        this.jobName = jobName;
161        if (jobName != null) {
162            listeners.add(new JobHistoryListener(jobName));
163        }
164    }
165
166    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
167            Integer nbThreads, String jobName, ImporterLogger log) {
168        this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log);
169    }
170
171    public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) {
172        this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation,
173                configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log);
174        repositoryName = configuration.repositoryName;
175    }
176
177    public void addFilter(ImporterFilter filter) {
178        log.debug(String.format(
179                "Filter with %s, was added on the importer with the hash code %s. The source node name is %s",
180                filter.toString(), this.hashCode(), importSource.getName()));
181        filters.add(filter);
182    }
183
184    public void addListeners(ImporterListener... listeners) {
185        addListeners(Arrays.asList(listeners));
186    }
187
188    public void addListeners(Collection<ImporterListener> listeners) {
189        this.listeners.addAll(listeners);
190    }
191
192    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
193        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
194    }
195
196    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
197        this.importingDocumentFilters.addAll(importingDocumentFilters);
198    }
199
200    @Override
201    public void run() {
202        Exception finalException = null;
203        try {
204            if (!TransactionHelper.isTransactionActive()) {
205                TransactionHelper.startTransaction();
206            }
207            session = CoreInstance.openCoreSessionSystem(repositoryName);
208            for (ImporterFilter filter : filters) {
209                log.debug(String.format(
210                        "Running filter with %s, on the importer with the hash code %s. The source node name is %s",
211                        filter.toString(), this.hashCode(), importSource.getName()));
212                filter.handleBeforeImport();
213            }
214            if (filters.size() == 0) {
215                log.debug(String.format(
216                        "No filters are registered on the importer with hash code %s, while importing the source node with name %s",
217                        this.hashCode(), importSource.getName()));
218            }
219            doRun();
220        } catch (Exception e) { // deals with interrupt below
221            ExceptionUtils.checkInterrupt(e);
222            log.error("Task exec failed", e);
223            finalException = e;
224        } finally {
225            for (ImporterFilter filter : filters) {
226                filter.handleAfterImport(finalException);
227            }
228            if (session != null) {
229                session.close();
230                session = null;
231            }
232        }
233    }
234
235    public void setRootImportTask(GenericThreadedImportTask rootImportTask) {
236        this.rootImportTask = rootImportTask;
237    }
238
239    protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer,
240            boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) {
241        if (rootImportTask == null) {
242            setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer,
243                    skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName));
244        } else {
245            rootImportTask.setInputSource(importSource);
246            rootImportTask.setTargetFolder(targetContainer);
247            rootImportTask.setSkipContainerCreation(skipRootContainerCreation);
248            rootImportTask.setRsLogger(log);
249            rootImportTask.setFactory(getFactory());
250            rootImportTask.setThreadPolicy(getThreadPolicy());
251            rootImportTask.setJobName(jobName);
252            rootImportTask.setBatchSize(batchSize);
253        }
254        rootImportTask.addListeners(listeners);
255        rootImportTask.addImportingDocumentFilters(importingDocumentFilters);
256        rootImportTask.setTransactionTimeout(transactionTimeout);
257        return rootImportTask;
258    }
259
260    /**
261     * Creates non-daemon threads at normal priority.
262     */
263    public static class NamedThreadFactory implements ThreadFactory {
264
265        private final AtomicInteger threadNumber = new AtomicInteger();
266
267        private final ThreadGroup group;
268
269        private final String prefix;
270
271        public NamedThreadFactory(String prefix) {
272            SecurityManager sm = System.getSecurityManager();
273            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
274            this.prefix = prefix;
275        }
276
277        @Override
278        public Thread newThread(Runnable r) {
279            String name = prefix + threadNumber.incrementAndGet();
280            Thread thread = new Thread(group, r, name);
281            // do not set daemon
282            thread.setPriority(Thread.NORM_PRIORITY);
283            return thread;
284        }
285    }
286
287    protected void doRun() throws IOException {
288
289        targetContainer = getTargetContainer();
290
291        nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
292
293        importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS,
294                new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-"));
295
296        initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName);
297
298        rootImportTask.setRootTask();
299        long t0 = System.currentTimeMillis();
300
301        notifyBeforeImport();
302
303        importTP.execute(rootImportTask);
304        sleep(200);
305        int activeTasks = importTP.getActiveCount();
306        int oldActiveTasks = 0;
307        long lastLogProgressTime = System.currentTimeMillis();
308        long lastCreatedDocCounter = 0;
309
310        String[] headers = { "nbDocs", "average", "imediate" };
311        PerfLogger perfLogger = new PerfLogger(headers);
312        while (activeTasks > 0) {
313            sleep(500);
314            activeTasks = importTP.getActiveCount();
315            boolean logProgress = false;
316            if (oldActiveTasks != activeTasks) {
317                oldActiveTasks = activeTasks;
318                log.debug("currently " + activeTasks + " active import Threads");
319                logProgress = true;
320
321            }
322            long ti = System.currentTimeMillis();
323            if (ti - lastLogProgressTime > 5000) {
324                logProgress = true;
325            }
326            if (logProgress) {
327                long inbCreatedDocs = getCreatedDocsCounter();
328                long deltaT = ti - lastLogProgressTime;
329                double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0));
330                double imediateSpeed = averageSpeed;
331                if (deltaT > 0) {
332                    imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT));
333                }
334                log.info(inbCreatedDocs + " docs created");
335                log.info("average speed = " + averageSpeed + " docs/s");
336                log.info("immediate speed = " + imediateSpeed + " docs/s");
337
338                if (enablePerfLogging) {
339                    Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed };
340                    perfLogger.log(perfData);
341                }
342
343                lastLogProgressTime = ti;
344                lastCreatedDocCounter = inbCreatedDocs;
345            }
346        }
347        stopImportProcrocess();
348        log.info("All Threads terminated");
349        perfLogger.release();
350        notifyAfterImport();
351
352        long t1 = System.currentTimeMillis();
353        long nbCreatedDocs = getCreatedDocsCounter();
354        log.info(nbCreatedDocs + " docs created");
355        log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s");
356        for (String k : nbCreatedDocsByThreads.keySet()) {
357            log.info(k + " --> " + nbCreatedDocsByThreads.get(k));
358        }
359        Stopwatch stopwatch;
360        for (String name : SimonManager.simonNames()) {
361            if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) {
362                continue;
363            }
364            stopwatch = SimonManager.getStopwatch(name);
365            if (stopwatch.getCounter() > 0) {
366                log.info(stopwatch.toString());
367            }
368        }
369
370    }
371
372    protected static void sleep(int millis) {
373        try {
374            Thread.sleep(millis);
375        } catch (InterruptedException e) {
376            // restore interrupt status
377            Thread.currentThread().interrupt();
378            throw new NuxeoException(e);
379        }
380    }
381
382    protected DocumentModel getTargetContainer() {
383        if (targetContainer == null) {
384            targetContainer = createTargetContainer();
385        }
386        return targetContainer;
387    }
388
389    /**
390     * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be
391     * overridden in subclasses.
392     *
393     * @return
394     */
395    protected DocumentModel createTargetContainer() {
396        try {
397            return session.getDocument(new PathRef(importWritePath));
398        } catch (DocumentNotFoundException e) {
399            log.error(e.getMessage());
400            throw e;
401        }
402    }
403
404    public ImporterThreadingPolicy getThreadPolicy() {
405        if (threadPolicy == null) {
406            threadPolicy = new DefaultMultiThreadingPolicy();
407        }
408        return threadPolicy;
409    }
410
411    public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
412        this.threadPolicy = threadPolicy;
413    }
414
415    public ImporterDocumentModelFactory getFactory() {
416        if (factory == null) {
417            factory = new DefaultDocumentModelFactory();
418        }
419        return factory;
420    }
421
422    public void setFactory(ImporterDocumentModelFactory factory) {
423        this.factory = factory;
424    }
425
426    /**
427     * @since 5.9.4
428     */
429    public void setTransactionTimeout(int transactionTimeout) {
430        this.transactionTimeout = transactionTimeout;
431    }
432
433    public void setEnablePerfLogging(boolean enablePerfLogging) {
434        this.enablePerfLogging = enablePerfLogging;
435    }
436
437    public void stopImportProcrocess() {
438        if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) {
439            importTP.shutdownNow();
440        }
441    }
442
443    protected void notifyBeforeImport() {
444        for (ImporterListener listener : listeners) {
445            listener.beforeImport();
446        }
447    }
448
449    protected void notifyAfterImport() {
450        for (ImporterListener listener : listeners) {
451            listener.afterImport();
452        }
453    }
454
455    /**
456     * @since 7.1
457     */
458    public String getRepositoryName() {
459        return repositoryName;
460    }
461
462    /**
463     * @since 7.1
464     */
465    public void setRepositoryName(String repositoryName) {
466        this.repositoryName = repositoryName;
467    }
468
469}