001/*
002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 *
019 * $Id$
020 */
021
022package org.nuxeo.ecm.platform.importer.base;
023
024import org.javasimon.SimonManager;
025import org.javasimon.Stopwatch;
026import org.nuxeo.common.utils.ExceptionUtils;
027import org.nuxeo.ecm.core.api.CoreInstance;
028import org.nuxeo.ecm.core.api.CoreSession;
029import org.nuxeo.ecm.core.api.DocumentModel;
030import org.nuxeo.ecm.core.api.DocumentNotFoundException;
031import org.nuxeo.ecm.core.api.NuxeoException;
032import org.nuxeo.ecm.core.api.PathRef;
033import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory;
034import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
035import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
036import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
037import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
038import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener;
039import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
040import org.nuxeo.ecm.platform.importer.log.PerfLogger;
041import org.nuxeo.ecm.platform.importer.source.SourceNode;
042import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy;
043import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
044import org.nuxeo.runtime.transaction.TransactionHelper;
045
046import java.io.IOException;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Collection;
050import java.util.List;
051import java.util.Map;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.LinkedBlockingQueue;
054import java.util.concurrent.ThreadFactory;
055import java.util.concurrent.ThreadPoolExecutor;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicInteger;
058
059/**
060 * Generic importer
061 *
062 * @author Thierry Delprat
063 */
064public class GenericMultiThreadedImporter implements ImporterRunner {
065
066    protected static ThreadPoolExecutor importTP;
067
068    protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<>();
069
070    protected ImporterThreadingPolicy threadPolicy;
071
072    protected ImporterDocumentModelFactory factory;
073
074    protected SourceNode importSource;
075
076    protected DocumentModel targetContainer;
077
078    protected Integer batchSize = 50;
079
080    protected Integer nbThreads = 5;
081
082    protected Integer transactionTimeout = 0;
083
084    protected ImporterLogger log;
085
086    protected CoreSession session;
087
088    protected String importWritePath;
089
090    protected Boolean skipRootContainerCreation = false;
091
092    protected String jobName;
093
094    protected boolean enablePerfLogging = true;
095
096    protected List<ImporterFilter> filters = new ArrayList<>();
097
098    protected List<ImporterListener> listeners = new ArrayList<>();
099
100    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<>();
101
102    protected GenericThreadedImportTask rootImportTask;
103
104    protected final static int DEFAULT_QUEUE_SIZE = 10000;
105
106    protected int queueSize = DEFAULT_QUEUE_SIZE;
107
108    protected String repositoryName;
109
110    protected static final String[] PERF_HEADERS = { "nbDocs", "average", "imediate" };
111
112    public static ThreadPoolExecutor getExecutor() {
113        return importTP;
114    }
115
116    public static void addCreatedDoc(String taskId, long nbDocs) {
117        String tid = Thread.currentThread().getName();
118        nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs);
119    }
120
121    public static long getCreatedDocsCounter() {
122        long counter = 0;
123        for (String tid : nbCreatedDocsByThreads.keySet()) {
124            Long tCounter = nbCreatedDocsByThreads.get(tid);
125            if (tCounter != null) {
126                counter += tCounter;
127            }
128        }
129        return counter;
130    }
131
132    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
133            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) {
134        importSource = sourceNode;
135        this.importWritePath = importWritePath;
136        this.log = log;
137        if (batchSize != null) {
138            this.batchSize = batchSize;
139        }
140        if (nbThreads != null) {
141            this.nbThreads = nbThreads;
142        }
143        if (skipRootContainerCreation != null) {
144            this.skipRootContainerCreation = skipRootContainerCreation;
145        }
146    }
147
148    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
149            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) {
150        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE);
151    }
152
153    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
154            Integer nbThreads, ImporterLogger log) {
155        this(sourceNode, importWritePath, false, batchSize, nbThreads, log);
156    }
157
158    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
159            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) {
160
161        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log);
162        this.jobName = jobName;
163        if (jobName != null) {
164            listeners.add(new JobHistoryListener(jobName));
165        }
166    }
167
168    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
169            Integer nbThreads, String jobName, ImporterLogger log) {
170        this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log);
171    }
172
173    public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) {
174        this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation,
175                configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log);
176        repositoryName = configuration.repositoryName;
177    }
178
179    public void addFilter(ImporterFilter filter) {
180        log.debug(String.format(
181                "Filter with %s, was added on the importer with the hash code %s. The source node name is %s",
182                filter.toString(), this.hashCode(), importSource.getName()));
183        filters.add(filter);
184    }
185
186    public void addListeners(ImporterListener... listeners) {
187        addListeners(Arrays.asList(listeners));
188    }
189
190    public void addListeners(Collection<ImporterListener> listeners) {
191        this.listeners.addAll(listeners);
192    }
193
194    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
195        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
196    }
197
198    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
199        this.importingDocumentFilters.addAll(importingDocumentFilters);
200    }
201
202    @Override
203    public void run() {
204        Exception finalException = null;
205        if (!TransactionHelper.isTransactionActive()) {
206            TransactionHelper.startTransaction();
207        }
208        try {
209            session = CoreInstance.getCoreSessionSystem(repositoryName);
210            for (ImporterFilter filter : filters) {
211                log.debug(String.format(
212                        "Running filter with %s, on the importer with the hash code %s. The source node name is %s",
213                        filter.toString(), this.hashCode(), importSource.getName()));
214                filter.handleBeforeImport();
215            }
216            if (filters.size() == 0) {
217                log.debug(String.format(
218                        "No filters are registered on the importer with hash code %s, while importing the source node with name %s",
219                        this.hashCode(), importSource.getName()));
220            }
221            doRun();
222        } catch (Exception e) { // deals with interrupt below
223            ExceptionUtils.checkInterrupt(e);
224            log.error("Task exec failed", e);
225            finalException = e;
226        } finally {
227            for (ImporterFilter filter : filters) {
228                filter.handleAfterImport(finalException);
229            }
230        }
231    }
232
233    public void setRootImportTask(GenericThreadedImportTask rootImportTask) {
234        this.rootImportTask = rootImportTask;
235    }
236
237    protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer,
238            boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) {
239        if (rootImportTask == null) {
240            setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer,
241                    skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName));
242        } else {
243            rootImportTask.setInputSource(importSource);
244            rootImportTask.setTargetFolder(targetContainer);
245            rootImportTask.setSkipContainerCreation(skipRootContainerCreation);
246            rootImportTask.setRsLogger(log);
247            rootImportTask.setFactory(getFactory());
248            rootImportTask.setThreadPolicy(getThreadPolicy());
249            rootImportTask.setJobName(jobName);
250            rootImportTask.setBatchSize(batchSize);
251        }
252        rootImportTask.addListeners(listeners);
253        rootImportTask.addImportingDocumentFilters(importingDocumentFilters);
254        rootImportTask.setTransactionTimeout(transactionTimeout);
255        return rootImportTask;
256    }
257
258    /**
259     * Creates non-daemon threads at normal priority.
260     */
261    public static class NamedThreadFactory implements ThreadFactory {
262
263        private final AtomicInteger threadNumber = new AtomicInteger();
264
265        private final ThreadGroup group;
266
267        private final String prefix;
268
269        public NamedThreadFactory(String prefix) {
270            SecurityManager sm = System.getSecurityManager();
271            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
272            this.prefix = prefix;
273        }
274
275        @Override
276        public Thread newThread(Runnable r) {
277            String name = prefix + threadNumber.incrementAndGet();
278            Thread thread = new Thread(group, r, name);
279            // do not set daemon
280            thread.setPriority(Thread.NORM_PRIORITY);
281            return thread;
282        }
283    }
284
285    protected void doRun() throws IOException {
286
287        targetContainer = getTargetContainer();
288
289        nbCreatedDocsByThreads.clear();
290
291        importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS,
292                new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-"));
293
294        initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName);
295
296        rootImportTask.setRootTask();
297        long t0 = System.currentTimeMillis();
298
299        notifyBeforeImport();
300
301        importTP.execute(rootImportTask);
302        sleep(200);
303        int activeTasks = importTP.getActiveCount();
304        int oldActiveTasks = 0;
305        long lastLogProgressTime = System.currentTimeMillis();
306        long lastCreatedDocCounter = 0;
307
308        PerfLogger perfLogger = enablePerfLogging ? new PerfLogger(PERF_HEADERS) : null;
309        while (activeTasks > 0) {
310            sleep(500);
311            activeTasks = importTP.getActiveCount();
312            boolean logProgress = false;
313            if (oldActiveTasks != activeTasks) {
314                oldActiveTasks = activeTasks;
315                log.debug("currently " + activeTasks + " active import Threads");
316                logProgress = true;
317
318            }
319            long ti = System.currentTimeMillis();
320            if (ti - lastLogProgressTime > 5000) {
321                logProgress = true;
322            }
323            if (logProgress) {
324                long inbCreatedDocs = getCreatedDocsCounter();
325                long deltaT = ti - lastLogProgressTime;
326                double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0));
327                double imediateSpeed = averageSpeed;
328                if (deltaT > 0) {
329                    imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT));
330                }
331                log.info(inbCreatedDocs + " docs created");
332                log.info("average speed = " + averageSpeed + " docs/s");
333                log.info("immediate speed = " + imediateSpeed + " docs/s");
334
335                if (enablePerfLogging) {
336                    Double[] perfData = { Double.valueOf(inbCreatedDocs), averageSpeed, imediateSpeed };
337                    perfLogger.log(perfData);
338                }
339
340                lastLogProgressTime = ti;
341                lastCreatedDocCounter = inbCreatedDocs;
342            }
343        }
344        stopImportProcrocess();
345        log.info("All Threads terminated");
346        if (enablePerfLogging) {
347            perfLogger.release();
348        }
349        notifyAfterImport();
350
351        long t1 = System.currentTimeMillis();
352        long nbCreatedDocs = getCreatedDocsCounter();
353        log.info(nbCreatedDocs + " docs created");
354        log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s");
355        for (String k : nbCreatedDocsByThreads.keySet()) {
356            log.info(k + " --> " + nbCreatedDocsByThreads.get(k));
357        }
358        Stopwatch stopwatch;
359        for (String name : SimonManager.simonNames()) {
360            if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) {
361                continue;
362            }
363            stopwatch = SimonManager.getStopwatch(name);
364            if (stopwatch.getCounter() > 0) {
365                log.info(stopwatch.toString());
366            }
367        }
368
369    }
370
371    protected static void sleep(int millis) {
372        try {
373            Thread.sleep(millis);
374        } catch (InterruptedException e) {
375            Thread.currentThread().interrupt();
376            throw new NuxeoException(e);
377        }
378    }
379
380    protected DocumentModel getTargetContainer() {
381        if (targetContainer == null) {
382            targetContainer = createTargetContainer();
383        }
384        return targetContainer;
385    }
386
387    /**
388     * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be
389     * overridden in subclasses.
390     */
391    protected DocumentModel createTargetContainer() {
392        try {
393            return session.getDocument(new PathRef(importWritePath));
394        } catch (DocumentNotFoundException e) {
395            log.error(e.getMessage());
396            throw e;
397        }
398    }
399
400    public ImporterThreadingPolicy getThreadPolicy() {
401        if (threadPolicy == null) {
402            threadPolicy = new DefaultMultiThreadingPolicy();
403        }
404        return threadPolicy;
405    }
406
407    public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
408        this.threadPolicy = threadPolicy;
409    }
410
411    public ImporterDocumentModelFactory getFactory() {
412        if (factory == null) {
413            factory = new DefaultDocumentModelFactory();
414        }
415        return factory;
416    }
417
418    public void setFactory(ImporterDocumentModelFactory factory) {
419        this.factory = factory;
420    }
421
422    /**
423     * @since 5.9.4
424     */
425    public void setTransactionTimeout(int transactionTimeout) {
426        this.transactionTimeout = transactionTimeout;
427    }
428
429    public void setEnablePerfLogging(boolean enablePerfLogging) {
430        this.enablePerfLogging = enablePerfLogging;
431    }
432
433    @Override
434    public void stopImportProcrocess() {
435        if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) {
436            importTP.shutdownNow();
437        }
438    }
439
440    protected void notifyBeforeImport() {
441        for (ImporterListener listener : listeners) {
442            listener.beforeImport();
443        }
444    }
445
446    protected void notifyAfterImport() {
447        for (ImporterListener listener : listeners) {
448            listener.afterImport();
449        }
450    }
451
452    /**
453     * @since 7.1
454     */
455    public String getRepositoryName() {
456        return repositoryName;
457    }
458
459    /**
460     * @since 7.1
461     */
462    public void setRepositoryName(String repositoryName) {
463        this.repositoryName = repositoryName;
464    }
465
466}