001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 */
017package org.nuxeo.ecm.platform.importer.queue;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import org.joda.time.DateTime;
023import org.joda.time.Minutes;
024import org.nuxeo.common.utils.ExceptionUtils;
025import org.nuxeo.ecm.core.api.CoreInstance;
026import org.nuxeo.ecm.core.api.CoreSession;
027import org.nuxeo.ecm.core.api.DocumentModel;
028import org.nuxeo.ecm.core.api.PathRef;
029import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
030import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
031import org.nuxeo.ecm.platform.importer.queue.consumer.Consumer;
032import org.nuxeo.ecm.platform.importer.queue.consumer.ConsumerFactory;
033import org.nuxeo.ecm.platform.importer.queue.consumer.ImportStat;
034import org.nuxeo.ecm.platform.importer.queue.manager.QueuesManager;
035import org.nuxeo.ecm.platform.importer.queue.producer.Producer;
036
037/**
038 * @since 8.3
039 */
040public class QueueImporter {
041
042    protected ImporterLogger log = null;
043
044    private boolean isTerminated = false;
045
046    private boolean mustStop = false;
047
048    protected long processedNodesConsumer = 0L;
049
050    protected ImportStat importStat = new ImportStat();
051
052    protected long nbDocsCreated = 0L;
053
054    protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>();
055
056    public QueueImporter(ImporterLogger log) {
057        this.log = log;
058    }
059
060    public void mustStop() {
061        mustStop = true;
062    }
063
064    public void importDocuments(Producer producer, QueuesManager manager, String importPath, String repositoryName,
065            int batchSize, ConsumerFactory factory) {
066        log.info("Starting import process");
067
068        producer.init(manager);
069
070        // start the producer
071        Thread p = new Thread(producer);
072        p.setName("import-Producer");
073
074        p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
075
076            @Override
077            public void uncaughtException(Thread t, Throwable e) {
078                log.error("Uncaught exception in " + p.getName() + ". Producer is going to be stopped", e);
079                producer.mustStop();
080            }
081        });
082        p.start();
083
084        Exception finalException = null;
085        CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName);
086        DateTime importStarted = new DateTime();
087        try {
088            List<Thread> consumerThreads = new ArrayList<Thread>();
089            List<Consumer> consumers = new ArrayList<Consumer>();
090            enableFilters();
091
092            DocumentModel root = session.getDocument(new PathRef(importPath));
093            for (int i = 0; i < manager.getNBConsumers(); i++) {
094                Consumer c;
095                c = factory.createConsumer(log, root, batchSize, manager.getQueue(i));
096
097                consumers.add(c);
098                Thread ct = new Thread(c);
099
100                ct.setName("import-Consumer" + i);
101                ct.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
102
103                    @Override
104                    public void uncaughtException(Thread t, Throwable e) {
105                        log.error("Uncaught exception in " + ct.getName() + ". Consumer is going to be stopped", e);
106                        c.mustStop();
107                    }
108                });
109                ct.start();
110                consumerThreads.add(ct);
111            }
112
113            try {
114                while (!producer.isTerminated() && !mustStop) {
115                    Thread.sleep(50);
116                    log.debug("waiting for producer to be completed. Processed docs: " + producer.getNbProcessed());
117
118                    // Check if consumers are still alive
119                    boolean consumersTerminated = true;
120                    for (Consumer c : consumers) {
121                        consumersTerminated = consumersTerminated && c.isTerminated();
122                    }
123                    // Otherwise stop the producer
124                    if (consumersTerminated) {
125                        log.error("all consumers are terminated,but producer is still alive. Stopping producer.");
126                        producer.mustStop();
127                    }
128                }
129            } catch (InterruptedException e) {
130                log.error("Error while waiting for producder", e);
131                finalException = e;
132                ExceptionUtils.checkInterrupt(e);
133            } finally {
134                if (!producer.isTerminated()) {
135                    log.warn("Forcibly stopping producer");
136                    producer.mustStop();
137                }
138            }
139
140            Exception pe = producer.getError();
141            if (pe != null) {
142                log.error("Error during producer execution", pe);
143                finalException = pe;
144                for (Consumer c : consumers) {
145                    c.mustStop();
146                }
147            } else {
148                for (Consumer c : consumers) {
149                    c.canStop();
150                }
151            }
152
153            try {
154                int iLoop = 0;
155
156                while (!isTerminated && !mustStop) {
157                    double totalImmediateThroughput = 0;
158                    double totalThroughput = 0;
159                    nbDocsCreated = 0;
160
161                    Thread.sleep(100);
162                    isTerminated = true;
163                    iLoop++;
164                    for (Consumer c : consumers) {
165                        isTerminated = isTerminated && c.isTerminated();
166                        nbDocsCreated += c.getNbDocsCreated();
167                        totalImmediateThroughput += c.getImmediateThroughput();
168                        totalThroughput += c.getThroughput();
169                    }
170
171                    if (iLoop % 30 == 0) {
172                        log.debug("waiting for consumers to be completed. Processed Docs: " + nbDocsCreated + " -- "
173                                + totalImmediateThroughput + " docs/s -- " + totalThroughput + " docs/s");
174                    }
175                }
176                // Consumers are done, get total number of nodes imported
177                for (Consumer c : consumers) {
178                    processedNodesConsumer += c.getNbProcessed();
179                }
180                nbDocsCreated = 0;
181                // Consumers are done, get total number of docs created
182                for (Consumer c : consumers) {
183                    nbDocsCreated += c.getNbDocsCreated();
184                    importStat.merge(c.getImportStat());
185                }
186
187            } catch (InterruptedException e) {
188                log.error("Error while waiting for consumers", e);
189                finalException = e;
190                ExceptionUtils.checkInterrupt(e);
191            } finally {
192                for (Consumer c : consumers) {
193                    if (!c.isTerminated()) {
194                        log.warn("Forcibly stopping consumer");
195                        c.mustStop();
196                    }
197                }
198            }
199
200        } catch (Exception e) {
201            log.error("Error while importing", e);
202            finalException = e;
203        } finally {
204            disableFilters(finalException);
205            if (session != null) {
206                session.close();
207                session = null;
208            }
209        }
210
211        if (producer.getNbProcessed() != processedNodesConsumer) {
212            log.error(
213                    String.format("Producer produced %s nodes, Consumers processed %s nodes, some nodes have been lost",
214                            producer.getNbProcessed(), processedNodesConsumer));
215        }
216
217        DateTime importFinished = new DateTime();
218
219        log.info(String.format("End of import process : Imported %s docs in %s mn. ", nbDocsCreated,
220                Minutes.minutesBetween(importStarted, importFinished).getMinutes()));
221
222    }
223
224    public ImportStat getImportStat() {
225        return importStat;
226    }
227
228    public void addFilter(ImporterFilter filter) {
229        log.debug(String.format("Filter with %s, was added on the importer with the hash code %s.", filter.toString(),
230                hashCode()));
231        filters.add(filter);
232    }
233
234    public boolean isRunning() {
235        return !isTerminated;
236    }
237
238    public long getCreatedDocsCounter() {
239        return nbDocsCreated;
240    }
241
242    protected void enableFilters() {
243        for (ImporterFilter filter : filters) {
244            log.debug(String.format("Running filter with %s, on the importer with the hash code %s.", filter.toString(),
245                    hashCode()));
246            filter.handleBeforeImport();
247        }
248        if (filters.size() == 0) {
249            log.debug(String.format("No filters are registered on the importer with hash code %s", hashCode()));
250        }
251    }
252
253    protected void disableFilters(Exception finalException) {
254        for (ImporterFilter filter : filters) {
255            filter.handleAfterImport(finalException);
256        }
257
258    }
259
260}