001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 */
017package org.nuxeo.ecm.platform.importer.queue;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.List;
022
023import org.joda.time.DateTime;
024import org.joda.time.Minutes;
025import org.joda.time.Seconds;
026import org.nuxeo.common.utils.ExceptionUtils;
027import org.nuxeo.ecm.core.api.CoreInstance;
028import org.nuxeo.ecm.core.api.CoreSession;
029import org.nuxeo.ecm.core.api.DocumentModel;
030import org.nuxeo.ecm.core.api.PathRef;
031import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
032import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
033import org.nuxeo.ecm.platform.importer.queue.consumer.Consumer;
034import org.nuxeo.ecm.platform.importer.queue.consumer.ConsumerFactory;
035import org.nuxeo.ecm.platform.importer.queue.consumer.ImportStat;
036import org.nuxeo.ecm.platform.importer.queue.manager.QueuesManager;
037import org.nuxeo.ecm.platform.importer.queue.producer.Producer;
038import org.nuxeo.ecm.platform.importer.source.SourceNode;
039
040
041/**
042 * @since 8.3
043 */
044public class QueueImporter {
045
046    protected ImporterLogger log = null;
047
048    protected long processedNodesConsumer = 0L;
049
050    protected long unprocessedNodesConsumer = 0L;
051
052    protected long nbDocsCreated = 0L;
053
054    protected volatile boolean isRunning = false;
055
056    protected final ImportStat importStat = new ImportStat();
057
058    protected final List<ImporterFilter> filters = new ArrayList<>();
059
060    protected final List<Thread> consumerThreads = new ArrayList<>();
061
062    protected Thread producerThread;
063
064    public QueueImporter(ImporterLogger log) {
065        this.log = log;
066    }
067
068    public void importDocuments(Producer producer, QueuesManager manager, String importPath, String repositoryName,
069                                int batchSize, ConsumerFactory factory) {
070        importDocuments(Collections.singletonList(producer), manager, importPath, repositoryName, batchSize, factory);
071    }
072
073    public void importDocuments(List<Producer> producers, QueuesManager manager, String importPath, String repositoryName,
074                                int batchSize, ConsumerFactory factory) {
075        log.info("importer: Starting import process");
076        isRunning = true;
077        Exception finalException = null;
078        DateTime importStarted = new DateTime();
079        long processed = 0;
080        enableFilters();
081        try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)){
082            producers.get(0).init(manager);
083            DocumentModel root = session.getDocument(new PathRef(importPath));
084            startProducerThread(producers.get(0));
085            List<Consumer> consumers = startConsumerPool(manager, root, batchSize, factory);
086            finalException = waitForProducer(producers.get(0));
087            processed = producers.get(0).getNbProcessed();
088            for (int i = 1; i< producers.size(); i++) {
089                startProducerThread(producers.get(i));
090                finalException = waitForProducer(producers.get(i));
091                processed += producers.get(i).getNbProcessed();
092            }
093            consumersCanStop(consumers);
094            finalException = waitForConsumers(consumers);
095            checkConsumerQueues(manager);
096            updateStats(consumers, producers);
097        } catch (Exception e) {
098            log.error("Error while importing", e);
099            finalException = e;
100        } finally {
101            disableFilters(finalException);
102            isRunning = false;
103        }
104
105        DateTime importFinished = new DateTime();
106        log.info(String.format("import: End of process: producer send %d docs, consumer receive %d docs, creating %d docs (include retries) in %s mn, rate %.2f doc/s.",
107                processed, processedNodesConsumer, nbDocsCreated,
108                Minutes.minutesBetween(importStarted, importFinished).getMinutes(),
109                processedNodesConsumer/(float) Seconds.secondsBetween(importStarted, importFinished).getSeconds()));
110    }
111
112    protected void checkConsumerQueues(QueuesManager manager) {
113        unprocessedNodesConsumer = 0;
114        for (int i = 0; i < manager.count(); i++) {
115            while (! manager.isEmpty(i)) {
116                log.error("Queue of conusmer " + i + " not empty, draining " + manager.size(i)  + " nodes to errors");
117                unprocessedNodesConsumer += manager.size(i);
118                do {
119                    SourceNode node = manager.poll(i);
120                    if (node != null) {
121                        log.error("Unable to import " + node.getName() + " by consumer " + i);
122                    }
123                } while (manager.isEmpty(i));
124            }
125        }
126    }
127
128    private void updateStats(List<Consumer> consumers, List<Producer> producers) {
129        nbDocsCreated = 0;
130        for (Consumer c : consumers) {
131            processedNodesConsumer += c.getNbProcessed();
132            nbDocsCreated += c.getNbDocsCreated();
133        }
134        if (unprocessedNodesConsumer > 0) {
135            log.error("Total number of unprocessed doc because of consumers unexpected end: " + unprocessedNodesConsumer);
136        }
137        for (Producer producer: producers) {
138            if (producer.getNbProcessed() != processedNodesConsumer) {
139                log.error(
140                        String.format("Producer produced %s nodes, Consumers processed %s nodes, some nodes have been lost",
141                                producer.getNbProcessed(), processedNodesConsumer));
142            }
143        }
144
145    }
146
147    private Exception waitForConsumers(List<Consumer> consumers) {
148        Exception ret = null;
149        try {
150            while (!consumersTerminated(consumers)) {
151                Thread.sleep(100);
152            }
153        } catch (InterruptedException e) {
154            log.error("importer: Got an InterruptedException", e);
155            ret = e;
156            ExceptionUtils.checkInterrupt(e);
157        } finally {
158            for (Consumer consumer : consumers) {
159                if (!consumer.isTerminated()) {
160                    log.warn("Forcibly stopping consumer");
161                    consumer.mustStop();
162                }
163            }
164        }
165        log.info("importer: All consumers has terminated their work.");
166        for (Thread thread: consumerThreads) {
167            try {
168                thread.join();
169            } catch (InterruptedException e) {
170                log.error("importer: Got an InterruptedException", e);
171                ExceptionUtils.checkInterrupt(e);
172            }
173        }
174        log.info("importer: All consumers threads terminated");
175        consumerThreads.clear();
176
177        int processed = 0;
178        int i = 0;
179        for (Consumer consumer : consumers) {
180            processed += consumer.getNbProcessed();
181            i += 1;
182        }
183        log.info("importer: " + i +  " consumers terminated, processed: " + processed );
184
185        return ret;
186    }
187
188    private void consumersCanStop(List<Consumer> consumers) {
189        consumers.forEach(TaskRunner::canStop);
190    }
191
192    protected Exception waitForProducer(Producer producer) {
193        Exception ret = null;
194        try {
195            while (producerThread.isAlive() && !producer.isTerminated()) {
196                Thread.sleep(100);
197            }
198            log.info("importer: producer terminated its work");
199            producerThread.join();
200            log.info("importer: producer thread terminated");
201            producerThread = null;
202        } catch (InterruptedException e) {
203            log.error("importer: Got an InterruptedException", e);
204            ExceptionUtils.checkInterrupt(e);
205            ret = e;
206        } finally {
207            if (!producer.isTerminated()) {
208                log.warn("Forcibly stopping producer");
209                producer.mustStop();
210            }
211        }
212        log.info("importer: producer terminated processed: " + producer.getNbProcessed());
213        return ret;
214    }
215
216    protected boolean consumersTerminated(List<Consumer> consumers) {
217        for (Consumer c : consumers) {
218            if (!c.isTerminated()) {
219                return false;
220            }
221        }
222        return true;
223    }
224
225    private List<Consumer> startConsumerPool(QueuesManager manager, DocumentModel root, int batchSize, ConsumerFactory factory) {
226                ArrayList<Consumer> ret = new ArrayList<>(manager.count());
227                for (int i = 0; i < manager.count(); i++) {
228            Consumer c;
229            c = factory.createConsumer(log, root, batchSize, manager, i);
230            ret.add(c);
231            Thread ct = new Thread(c);
232            ct.setName("import-Consumer" + i);
233            ct.setUncaughtExceptionHandler((t, e) -> {
234                log.error("Uncaught exception in " + ct.getName() + ". Consumer is going to be stopped", e);
235            });
236            ct.start();
237            consumerThreads.add(ct);
238        }
239        return ret;
240    }
241
242    protected void startProducerThread(final Producer producer) {
243        Thread p = new Thread(producer);
244        p.setName("import-Producer");
245        p.setUncaughtExceptionHandler((t, e) -> {
246            log.error("Uncaught exception in " + p.getName() + ". Producer is going to be stopped", e);
247            producer.mustStop();
248        });
249        p.start();
250        producerThread = p;
251    }
252
253    public ImportStat getImportStat() {
254        return importStat;
255    }
256
257    public void addFilter(ImporterFilter filter) {
258        log.debug(String.format("Filter with %s, was added on the importer with the hash code %s.", filter.toString(),
259                hashCode()));
260        filters.add(filter);
261    }
262
263    public long getCreatedDocsCounter() {
264        return nbDocsCreated;
265    }
266
267    protected void enableFilters() {
268        for (ImporterFilter filter : filters) {
269            log.debug(String.format("Running filter with %s, on the importer with the hash code %s.", filter.toString(),
270                    hashCode()));
271            filter.handleBeforeImport();
272        }
273        if (filters.size() == 0) {
274            log.debug(String.format("No filters are registered on the importer with hash code %s", hashCode()));
275        }
276    }
277
278    protected void disableFilters(Exception finalException) {
279        for (ImporterFilter filter : filters) {
280            filter.handleAfterImport(finalException);
281        }
282
283    }
284
285    public boolean isRunning() {
286        return isRunning;
287    }
288
289}