001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Delbosc Benoit
016 */
017
018package org.nuxeo.elasticsearch.work;
019
020import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_READ_PROPERTY;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.ecm.core.api.CoreSession;
031import org.nuxeo.ecm.core.api.IterableQueryResult;
032import org.nuxeo.ecm.core.query.sql.NXQL;
033import org.nuxeo.ecm.core.work.api.Work;
034import org.nuxeo.ecm.core.work.api.WorkManager;
035import org.nuxeo.runtime.api.Framework;
036
037/**
038 * Worker to reindex a large amount of document
039 *
040 * @since 7.1
041 */
042public class ScrollingIndexingWorker extends BaseIndexingWorker implements Work {
043    private static final Log log = LogFactory.getLog(ScrollingIndexingWorker.class);
044
045    private static final long serialVersionUID = -4507677669419340384L;
046
047    private static final String DEFAULT_BUCKET_SIZE = "500";
048
049    private static final long WARN_DOC_COUNT = 500;
050
051    protected final String nxql;
052
053    protected transient WorkManager workManager;
054
055    protected long documentCount = 0;
056
057    public ScrollingIndexingWorker(String repositoryName, String nxql) {
058        this.repositoryName = repositoryName;
059        this.nxql = nxql;
060    }
061
062    @Override
063    public String getTitle() {
064        return "Elasticsearch scrolling indexer: " + nxql + ", processed " + documentCount;
065    }
066
067    @Override
068    protected void doWork() {
069        String jobName = getSchedulePath().getPath();
070        if (log.isDebugEnabled()) {
071            log.debug(String.format("Re-indexing job: %s started, NXQL: %s on repository: %s", jobName, nxql,
072                    repositoryName));
073        }
074        CoreSession session = initSession(repositoryName);
075        IterableQueryResult res = session.queryAndFetch(nxql, NXQL.NXQL);
076        int bucketCount = 0;
077        boolean warnAtEnd = false;
078        try {
079            Iterator<Map<String, Serializable>> it = res.iterator();
080            int bucketSize = getBucketSize();
081            List<String> ids = new ArrayList<>(bucketSize);
082            while (it.hasNext()) {
083                documentCount += 1;
084                ids.add((String) it.next().get(NXQL.ECM_UUID));
085                if (ids.size() == bucketSize) {
086                    scheduleBucketWorker(ids, false);
087                    ids = new ArrayList<>(bucketSize);
088                    bucketCount += 1;
089                }
090            }
091            if (documentCount > WARN_DOC_COUNT) {
092                warnAtEnd = true;
093            }
094            scheduleBucketWorker(ids, warnAtEnd);
095            if (!ids.isEmpty()) {
096                bucketCount += 1;
097            }
098        } finally {
099            res.close();
100            if (warnAtEnd || log.isDebugEnabled()) {
101                String message = String.format("Re-indexing job: %s has submited %d documents in %d bucket workers",
102                        jobName, documentCount, bucketCount);
103                if (warnAtEnd) {
104                    log.warn(message);
105                } else {
106                    log.debug(message);
107                }
108            }
109        }
110    }
111
112    protected void scheduleBucketWorker(List<String> bucket, boolean isLast) {
113        if (bucket.isEmpty()) {
114            return;
115        }
116        BucketIndexingWorker subWorker = new BucketIndexingWorker(repositoryName, bucket, isLast);
117        getWorkManager().schedule(subWorker);
118    }
119
120    protected WorkManager getWorkManager() {
121        if (workManager == null) {
122            workManager = Framework.getLocalService(WorkManager.class);
123        }
124        return workManager;
125    }
126
127    protected int getBucketSize() {
128        String value = Framework.getProperty(REINDEX_BUCKET_READ_PROPERTY, DEFAULT_BUCKET_SIZE);
129        return Integer.parseInt(value);
130    }
131
132}