001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 */
017package org.nuxeo.ecm.platform.importer.queue;
018
019import org.joda.time.DateTime;
020import org.joda.time.Minutes;
021import org.joda.time.Seconds;
022import org.nuxeo.common.utils.ExceptionUtils;
023import org.nuxeo.ecm.core.api.CoreInstance;
024import org.nuxeo.ecm.core.api.CoreSession;
025import org.nuxeo.ecm.core.api.DocumentModel;
026import org.nuxeo.ecm.core.api.PathRef;
027import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
028import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
029import org.nuxeo.ecm.platform.importer.queue.consumer.Consumer;
030import org.nuxeo.ecm.platform.importer.queue.consumer.ConsumerFactory;
031import org.nuxeo.ecm.platform.importer.queue.consumer.ImportStat;
032import org.nuxeo.ecm.platform.importer.queue.manager.QueuesManager;
033import org.nuxeo.ecm.platform.importer.queue.producer.Producer;
034import org.nuxeo.ecm.platform.importer.source.SourceNode;
035
036import java.util.ArrayList;
037import java.util.List;
038import java.util.concurrent.BlockingQueue;
039
040
041/**
042 * @since 8.3
043 */
044public class QueueImporter {
045
046    protected ImporterLogger log = null;
047
048    protected long processedNodesConsumer = 0L;
049
050    protected long unprocessedNodesConsumer = 0L;
051
052    protected long nbDocsCreated = 0L;
053
054    protected volatile boolean isRunning = false;
055
056    protected final ImportStat importStat = new ImportStat();
057
058    protected final List<ImporterFilter> filters = new ArrayList<>();
059
060    protected final List<Thread> consumerThreads = new ArrayList<>();
061
062    protected Thread producerThread;
063
064    public QueueImporter(ImporterLogger log) {
065        this.log = log;
066    }
067
068    public void importDocuments(Producer producer, QueuesManager manager, String importPath, String repositoryName,
069                                int batchSize, ConsumerFactory factory) {
070        log.info("importer: Starting import process");
071        isRunning = true;
072        Exception finalException = null;
073        DateTime importStarted = new DateTime();
074
075        enableFilters();
076        try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)){
077            producer.init(manager);
078            DocumentModel root = session.getDocument(new PathRef(importPath));
079            startProducerThread(producer);
080            List<Consumer> consumers = startConsumerPool(manager, root, batchSize, factory);
081            finalException = waitForProducer(producer);
082            consumersCanStop(consumers);
083            finalException = waitForConsumers(consumers);
084            checkConsumerQueues(manager);
085            updateStats(consumers, producer);
086
087        } catch (Exception e) {
088            log.error("Error while importing", e);
089            finalException = e;
090        } finally {
091            disableFilters(finalException);
092            isRunning = false;
093        }
094
095        DateTime importFinished = new DateTime();
096        log.info(String.format("import: End of process: producer send %d docs, consumer receive %d docs, creating %d docs (include retries) in %s mn, rate %.2f doc/s.",
097                producer.getNbProcessed(), processedNodesConsumer, nbDocsCreated,
098                Minutes.minutesBetween(importStarted, importFinished).getMinutes(),
099                processedNodesConsumer/(float) Seconds.secondsBetween(importStarted, importFinished).getSeconds()));
100    }
101
102    protected void checkConsumerQueues(QueuesManager manager) {
103        unprocessedNodesConsumer = 0;
104        for (int i = 0; i < manager.getNBConsumers(); i++) {
105            BlockingQueue<SourceNode> queue = manager.getQueue(i);
106            if (! queue.isEmpty()) {
107                log.error("Queue of conusmer " + i + " not empty, draining " + queue.size()  + " nodes to errors");
108                unprocessedNodesConsumer += queue.size();
109                do {
110                    SourceNode node = queue.poll();
111                    if (node != null) {
112                        log.error("Unable to import " + node.getName() + " by consumer " + i);
113                    }
114                } while (!queue.isEmpty());
115            }
116        }
117    }
118
119    private void updateStats(List<Consumer> consumers, Producer producer) {
120        nbDocsCreated = 0;
121        for (Consumer c : consumers) {
122            processedNodesConsumer += c.getNbProcessed();
123            nbDocsCreated += c.getNbDocsCreated();
124        }
125        if (unprocessedNodesConsumer > 0) {
126            log.error("Total number of unprocessed doc because of consumers unexpected end: " + unprocessedNodesConsumer);
127        }
128        if (producer.getNbProcessed() != processedNodesConsumer) {
129            log.error(
130                    String.format("Producer produced %s nodes, Consumers processed %s nodes, some nodes have been lost",
131                            producer.getNbProcessed(), processedNodesConsumer));
132        }
133
134    }
135
136    private Exception waitForConsumers(List<Consumer> consumers) {
137        Exception ret = null;
138        try {
139            while (!consumersTerminated(consumers)) {
140                Thread.sleep(100);
141            }
142        } catch (InterruptedException e) {
143            log.error("importer: Got an InterruptedException", e);
144            ret = e;
145            ExceptionUtils.checkInterrupt(e);
146        } finally {
147            for (Consumer consumer : consumers) {
148                if (!consumer.isTerminated()) {
149                    log.warn("Forcibly stopping consumer");
150                    consumer.mustStop();
151                }
152            }
153        }
154        log.info("importer: All consumers has terminated their work.");
155        for (Thread thread: consumerThreads) {
156            try {
157                thread.join();
158            } catch (InterruptedException e) {
159                log.error("importer: Got an InterruptedException", e);
160                ExceptionUtils.checkInterrupt(e);
161            }
162        }
163        log.info("importer: All consumers threads terminated");
164        consumerThreads.clear();
165
166        int processed = 0;
167        int i = 0;
168        for (Consumer consumer : consumers) {
169            processed += consumer.getNbProcessed();
170            i += 1;
171        }
172        log.info("importer: " + i +  " consumers terminated, processed: " + processed );
173
174        return ret;
175    }
176
177    private void consumersCanStop(List<Consumer> consumers) {
178        consumers.forEach(TaskRunner::canStop);
179    }
180
181    protected Exception waitForProducer(Producer producer) {
182        Exception ret = null;
183        try {
184            while (producerThread.isAlive() && !producer.isTerminated()) {
185                Thread.sleep(100);
186            }
187            log.info("importer: producer terminated its work");
188            producerThread.join();
189            log.info("importer: producer thread terminated");
190            producerThread = null;
191        } catch (InterruptedException e) {
192            log.error("importer: Got an InterruptedException", e);
193            ExceptionUtils.checkInterrupt(e);
194            ret = e;
195        } finally {
196            if (!producer.isTerminated()) {
197                log.warn("Forcibly stopping producer");
198                producer.mustStop();
199            }
200        }
201        log.info("importer: producer terminated processed: " + producer.getNbProcessed());
202        return ret;
203    }
204
205    protected boolean consumersTerminated(List<Consumer> consumers) {
206        for (Consumer c : consumers) {
207            if (!c.isTerminated()) {
208                return false;
209            }
210        }
211        return true;
212    }
213
214    private List<Consumer> startConsumerPool(QueuesManager manager, DocumentModel root, int batchSize, ConsumerFactory factory) {
215        ArrayList<Consumer> ret = new ArrayList<>(manager.getNBConsumers());
216        for (int i = 0; i < manager.getNBConsumers(); i++) {
217            Consumer c;
218            c = factory.createConsumer(log, root, batchSize, manager.getQueue(i));
219            ret.add(c);
220            Thread ct = new Thread(c);
221            ct.setName("import-Consumer" + i);
222            ct.setUncaughtExceptionHandler((t, e) -> {
223                log.error("Uncaught exception in " + ct.getName() + ". Consumer is going to be stopped", e);
224            });
225            ct.start();
226            consumerThreads.add(ct);
227        }
228        return ret;
229    }
230
231    protected void startProducerThread(final Producer producer) {
232        Thread p = new Thread(producer);
233        p.setName("import-Producer");
234        p.setUncaughtExceptionHandler((t, e) -> {
235            log.error("Uncaught exception in " + p.getName() + ". Producer is going to be stopped", e);
236            producer.mustStop();
237        });
238        p.start();
239        producerThread = p;
240    }
241
242    public ImportStat getImportStat() {
243        return importStat;
244    }
245
246    public void addFilter(ImporterFilter filter) {
247        log.debug(String.format("Filter with %s, was added on the importer with the hash code %s.", filter.toString(),
248                hashCode()));
249        filters.add(filter);
250    }
251
252    public long getCreatedDocsCounter() {
253        return nbDocsCreated;
254    }
255
256    protected void enableFilters() {
257        for (ImporterFilter filter : filters) {
258            log.debug(String.format("Running filter with %s, on the importer with the hash code %s.", filter.toString(),
259                    hashCode()));
260            filter.handleBeforeImport();
261        }
262        if (filters.size() == 0) {
263            log.debug(String.format("No filters are registered on the importer with hash code %s", hashCode()));
264        }
265    }
266
267    protected void disableFilters(Exception finalException) {
268        for (ImporterFilter filter : filters) {
269            filter.handleAfterImport(finalException);
270        }
271
272    }
273
274    public boolean isRunning() {
275        return isRunning;
276    }
277
278}