001/*
002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 *
019 * $Id$
020 */
021
022package org.nuxeo.ecm.platform.importer.base;
023
024import org.javasimon.SimonManager;
025import org.javasimon.Stopwatch;
026import org.nuxeo.common.utils.ExceptionUtils;
027import org.nuxeo.ecm.core.api.CloseableCoreSession;
028import org.nuxeo.ecm.core.api.CoreInstance;
029import org.nuxeo.ecm.core.api.CoreSession;
030import org.nuxeo.ecm.core.api.DocumentModel;
031import org.nuxeo.ecm.core.api.DocumentNotFoundException;
032import org.nuxeo.ecm.core.api.NuxeoException;
033import org.nuxeo.ecm.core.api.PathRef;
034import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory;
035import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
036import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
037import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
038import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
039import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener;
040import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
041import org.nuxeo.ecm.platform.importer.log.PerfLogger;
042import org.nuxeo.ecm.platform.importer.source.SourceNode;
043import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy;
044import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
045import org.nuxeo.runtime.transaction.TransactionHelper;
046
047import java.io.IOException;
048import java.util.ArrayList;
049import java.util.Arrays;
050import java.util.Collection;
051import java.util.List;
052import java.util.Map;
053import java.util.concurrent.ConcurrentHashMap;
054import java.util.concurrent.LinkedBlockingQueue;
055import java.util.concurrent.ThreadFactory;
056import java.util.concurrent.ThreadPoolExecutor;
057import java.util.concurrent.TimeUnit;
058import java.util.concurrent.atomic.AtomicInteger;
059
060/**
061 * Generic importer
062 *
063 * @author Thierry Delprat
064 */
065public class GenericMultiThreadedImporter implements ImporterRunner {
066
067    protected static ThreadPoolExecutor importTP;
068
069    protected static Map<String, Long> nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
070
071    protected ImporterThreadingPolicy threadPolicy;
072
073    protected ImporterDocumentModelFactory factory;
074
075    protected SourceNode importSource;
076
077    protected DocumentModel targetContainer;
078
079    protected Integer batchSize = 50;
080
081    protected Integer nbThreads = 5;
082
083    protected Integer transactionTimeout = 0;
084
085    protected ImporterLogger log;
086
087    protected CoreSession session;
088
089    protected String importWritePath;
090
091    protected Boolean skipRootContainerCreation = false;
092
093    protected String jobName;
094
095    protected boolean enablePerfLogging = true;
096
097    protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>();
098
099    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
100
101    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
102
103    protected GenericThreadedImportTask rootImportTask;
104
105    protected final static int DEFAULT_QUEUE_SIZE = 10000;
106
107    protected int queueSize = DEFAULT_QUEUE_SIZE;
108
109    protected String repositoryName;
110
111    protected static final String[] PERF_HEADERS = { "nbDocs", "average", "imediate" };
112
113    public static ThreadPoolExecutor getExecutor() {
114        return importTP;
115    }
116
117    public static synchronized void addCreatedDoc(String taskId, long nbDocs) {
118        String tid = Thread.currentThread().getName();
119        nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs);
120    }
121
122    public static synchronized long getCreatedDocsCounter() {
123        long counter = 0;
124        for (String tid : nbCreatedDocsByThreads.keySet()) {
125            Long tCounter = nbCreatedDocsByThreads.get(tid);
126            if (tCounter != null) {
127                counter += tCounter;
128            }
129        }
130        return counter;
131    }
132
133    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
134            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log, int queueSize) {
135        importSource = sourceNode;
136        this.importWritePath = importWritePath;
137        this.log = log;
138        if (batchSize != null) {
139            this.batchSize = batchSize;
140        }
141        if (nbThreads != null) {
142            this.nbThreads = nbThreads;
143        }
144        if (skipRootContainerCreation != null) {
145            this.skipRootContainerCreation = skipRootContainerCreation;
146        }
147    }
148
149    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
150            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) {
151        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log, DEFAULT_QUEUE_SIZE);
152    }
153
154    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
155            Integer nbThreads, ImporterLogger log) {
156        this(sourceNode, importWritePath, false, batchSize, nbThreads, log);
157    }
158
159    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath,
160            Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) {
161
162        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log);
163        this.jobName = jobName;
164        if (jobName != null) {
165            listeners.add(new JobHistoryListener(jobName));
166        }
167    }
168
169    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize,
170            Integer nbThreads, String jobName, ImporterLogger log) {
171        this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log);
172    }
173
174    public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) {
175        this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation,
176                configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log);
177        repositoryName = configuration.repositoryName;
178    }
179
180    public void addFilter(ImporterFilter filter) {
181        log.debug(String.format(
182                "Filter with %s, was added on the importer with the hash code %s. The source node name is %s",
183                filter.toString(), this.hashCode(), importSource.getName()));
184        filters.add(filter);
185    }
186
187    public void addListeners(ImporterListener... listeners) {
188        addListeners(Arrays.asList(listeners));
189    }
190
191    public void addListeners(Collection<ImporterListener> listeners) {
192        this.listeners.addAll(listeners);
193    }
194
195    public void addImportingDocumentFilters(ImportingDocumentFilter... importingDocumentFilters) {
196        addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
197    }
198
199    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
200        this.importingDocumentFilters.addAll(importingDocumentFilters);
201    }
202
203    @Override
204    public void run() {
205        Exception finalException = null;
206        if (!TransactionHelper.isTransactionActive()) {
207            TransactionHelper.startTransaction();
208        }
209        try (CloseableCoreSession closeableCoreSession = CoreInstance.openCoreSessionSystem(repositoryName)) {
210            session = closeableCoreSession;
211            for (ImporterFilter filter : filters) {
212                log.debug(String.format(
213                        "Running filter with %s, on the importer with the hash code %s. The source node name is %s",
214                        filter.toString(), this.hashCode(), importSource.getName()));
215                filter.handleBeforeImport();
216            }
217            if (filters.size() == 0) {
218                log.debug(String.format(
219                        "No filters are registered on the importer with hash code %s, while importing the source node with name %s",
220                        this.hashCode(), importSource.getName()));
221            }
222            doRun();
223        } catch (Exception e) { // deals with interrupt below
224            ExceptionUtils.checkInterrupt(e);
225            log.error("Task exec failed", e);
226            finalException = e;
227        } finally {
228            for (ImporterFilter filter : filters) {
229                filter.handleAfterImport(finalException);
230            }
231            session = null;
232        }
233    }
234
235    public void setRootImportTask(GenericThreadedImportTask rootImportTask) {
236        this.rootImportTask = rootImportTask;
237    }
238
239    protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer,
240            boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) {
241        if (rootImportTask == null) {
242            setRootImportTask(new GenericThreadedImportTask(repositoryName, importSource, targetContainer,
243                    skipRootContainerCreation, log, batchSize, getFactory(), getThreadPolicy(), jobName));
244        } else {
245            rootImportTask.setInputSource(importSource);
246            rootImportTask.setTargetFolder(targetContainer);
247            rootImportTask.setSkipContainerCreation(skipRootContainerCreation);
248            rootImportTask.setRsLogger(log);
249            rootImportTask.setFactory(getFactory());
250            rootImportTask.setThreadPolicy(getThreadPolicy());
251            rootImportTask.setJobName(jobName);
252            rootImportTask.setBatchSize(batchSize);
253        }
254        rootImportTask.addListeners(listeners);
255        rootImportTask.addImportingDocumentFilters(importingDocumentFilters);
256        rootImportTask.setTransactionTimeout(transactionTimeout);
257        return rootImportTask;
258    }
259
260    /**
261     * Creates non-daemon threads at normal priority.
262     */
263    public static class NamedThreadFactory implements ThreadFactory {
264
265        private final AtomicInteger threadNumber = new AtomicInteger();
266
267        private final ThreadGroup group;
268
269        private final String prefix;
270
271        public NamedThreadFactory(String prefix) {
272            SecurityManager sm = System.getSecurityManager();
273            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
274            this.prefix = prefix;
275        }
276
277        @Override
278        public Thread newThread(Runnable r) {
279            String name = prefix + threadNumber.incrementAndGet();
280            Thread thread = new Thread(group, r, name);
281            // do not set daemon
282            thread.setPriority(Thread.NORM_PRIORITY);
283            return thread;
284        }
285    }
286
287    protected void doRun() throws IOException {
288
289        targetContainer = getTargetContainer();
290
291        nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
292
293        importTP = new ThreadPoolExecutor(nbThreads, nbThreads, 500L, TimeUnit.MILLISECONDS,
294                new LinkedBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("Nuxeo-Importer-"));
295
296        initRootTask(importSource, targetContainer, skipRootContainerCreation, log, batchSize, jobName);
297
298        rootImportTask.setRootTask();
299        long t0 = System.currentTimeMillis();
300
301        notifyBeforeImport();
302
303        importTP.execute(rootImportTask);
304        sleep(200);
305        int activeTasks = importTP.getActiveCount();
306        int oldActiveTasks = 0;
307        long lastLogProgressTime = System.currentTimeMillis();
308        long lastCreatedDocCounter = 0;
309
310        PerfLogger perfLogger = enablePerfLogging ? new PerfLogger(PERF_HEADERS) : null;
311        while (activeTasks > 0) {
312            sleep(500);
313            activeTasks = importTP.getActiveCount();
314            boolean logProgress = false;
315            if (oldActiveTasks != activeTasks) {
316                oldActiveTasks = activeTasks;
317                log.debug("currently " + activeTasks + " active import Threads");
318                logProgress = true;
319
320            }
321            long ti = System.currentTimeMillis();
322            if (ti - lastLogProgressTime > 5000) {
323                logProgress = true;
324            }
325            if (logProgress) {
326                long inbCreatedDocs = getCreatedDocsCounter();
327                long deltaT = ti - lastLogProgressTime;
328                double averageSpeed = 1000 * ((float) (inbCreatedDocs) / (ti - t0));
329                double imediateSpeed = averageSpeed;
330                if (deltaT > 0) {
331                    imediateSpeed = 1000 * ((float) (inbCreatedDocs - lastCreatedDocCounter) / (deltaT));
332                }
333                log.info(inbCreatedDocs + " docs created");
334                log.info("average speed = " + averageSpeed + " docs/s");
335                log.info("immediate speed = " + imediateSpeed + " docs/s");
336
337                if (enablePerfLogging) {
338                    Double[] perfData = { new Double(inbCreatedDocs), averageSpeed, imediateSpeed };
339                    perfLogger.log(perfData);
340                }
341
342                lastLogProgressTime = ti;
343                lastCreatedDocCounter = inbCreatedDocs;
344            }
345        }
346        stopImportProcrocess();
347        log.info("All Threads terminated");
348        if (enablePerfLogging) {
349            perfLogger.release();
350        }
351        notifyAfterImport();
352
353        long t1 = System.currentTimeMillis();
354        long nbCreatedDocs = getCreatedDocsCounter();
355        log.info(nbCreatedDocs + " docs created");
356        log.info(1000 * ((float) (nbCreatedDocs) / (t1 - t0)) + " docs/s");
357        for (String k : nbCreatedDocsByThreads.keySet()) {
358            log.info(k + " --> " + nbCreatedDocsByThreads.get(k));
359        }
360        Stopwatch stopwatch;
361        for (String name : SimonManager.simonNames()) {
362            if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer")) {
363                continue;
364            }
365            stopwatch = SimonManager.getStopwatch(name);
366            if (stopwatch.getCounter() > 0) {
367                log.info(stopwatch.toString());
368            }
369        }
370
371    }
372
373    protected static void sleep(int millis) {
374        try {
375            Thread.sleep(millis);
376        } catch (InterruptedException e) {
377            Thread.currentThread().interrupt();
378            throw new NuxeoException(e);
379        }
380    }
381
382    protected DocumentModel getTargetContainer() {
383        if (targetContainer == null) {
384            targetContainer = createTargetContainer();
385        }
386        return targetContainer;
387    }
388
389    /**
390     * Creates the target container where the import will // TODO Auto-generated constructor stub }be done. Can be
391     * overridden in subclasses.
392     *
393     * @return
394     */
395    protected DocumentModel createTargetContainer() {
396        try {
397            return session.getDocument(new PathRef(importWritePath));
398        } catch (DocumentNotFoundException e) {
399            log.error(e.getMessage());
400            throw e;
401        }
402    }
403
404    public ImporterThreadingPolicy getThreadPolicy() {
405        if (threadPolicy == null) {
406            threadPolicy = new DefaultMultiThreadingPolicy();
407        }
408        return threadPolicy;
409    }
410
411    public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
412        this.threadPolicy = threadPolicy;
413    }
414
415    public ImporterDocumentModelFactory getFactory() {
416        if (factory == null) {
417            factory = new DefaultDocumentModelFactory();
418        }
419        return factory;
420    }
421
422    public void setFactory(ImporterDocumentModelFactory factory) {
423        this.factory = factory;
424    }
425
426    /**
427     * @since 5.9.4
428     */
429    public void setTransactionTimeout(int transactionTimeout) {
430        this.transactionTimeout = transactionTimeout;
431    }
432
433    public void setEnablePerfLogging(boolean enablePerfLogging) {
434        this.enablePerfLogging = enablePerfLogging;
435    }
436
437    public void stopImportProcrocess() {
438        if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) {
439            importTP.shutdownNow();
440        }
441    }
442
443    protected void notifyBeforeImport() {
444        for (ImporterListener listener : listeners) {
445            listener.beforeImport();
446        }
447    }
448
449    protected void notifyAfterImport() {
450        for (ImporterListener listener : listeners) {
451            listener.afterImport();
452        }
453    }
454
455    /**
456     * @since 7.1
457     */
458    public String getRepositoryName() {
459        return repositoryName;
460    }
461
462    /**
463     * @since 7.1
464     */
465    public void setRepositoryName(String repositoryName) {
466        this.repositoryName = repositoryName;
467    }
468
469}