001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Delbosc Benoit
018 */
019
020package org.nuxeo.elasticsearch.work;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_READ_PROPERTY;
023
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.ecm.core.api.IterableQueryResult;
033import org.nuxeo.ecm.core.query.sql.NXQL;
034import org.nuxeo.ecm.core.work.api.Work;
035import org.nuxeo.ecm.core.work.api.WorkManager;
036import org.nuxeo.runtime.api.Framework;
037
038/**
039 * Worker to reindex a large amount of document
040 *
041 * @since 7.1
042 */
043public class ScrollingIndexingWorker extends BaseIndexingWorker implements Work {
044    private static final Log log = LogFactory.getLog(ScrollingIndexingWorker.class);
045
046    private static final long serialVersionUID = -4507677669419340384L;
047
048    private static final String DEFAULT_BUCKET_SIZE = "500";
049
050    private static final long WARN_DOC_COUNT = 500;
051
052    protected final String nxql;
053
054    protected transient WorkManager workManager;
055
056    protected long documentCount = 0;
057
058    public ScrollingIndexingWorker(String repositoryName, String nxql) {
059        this.repositoryName = repositoryName;
060        this.nxql = nxql;
061    }
062
063    @Override
064    public String getTitle() {
065        return "Elasticsearch scrolling indexer: " + nxql + ", processed " + documentCount;
066    }
067
068    @Override
069    protected void doWork() {
070        String jobName = getSchedulePath().getPath();
071        if (log.isDebugEnabled()) {
072            log.debug(String.format("Re-indexing job: %s started, NXQL: %s on repository: %s", jobName, nxql,
073                    repositoryName));
074        }
075        openSystemSession();
076        IterableQueryResult res = session.queryAndFetch(nxql, NXQL.NXQL);
077        int bucketCount = 0;
078        boolean warnAtEnd = false;
079        try {
080            Iterator<Map<String, Serializable>> it = res.iterator();
081            int bucketSize = getBucketSize();
082            List<String> ids = new ArrayList<>(bucketSize);
083            while (it.hasNext()) {
084                documentCount += 1;
085                ids.add((String) it.next().get(NXQL.ECM_UUID));
086                if (ids.size() == bucketSize) {
087                    scheduleBucketWorker(ids, false);
088                    ids = new ArrayList<>(bucketSize);
089                    bucketCount += 1;
090                }
091            }
092            if (documentCount > WARN_DOC_COUNT) {
093                warnAtEnd = true;
094            }
095            scheduleBucketWorker(ids, warnAtEnd);
096            if (!ids.isEmpty()) {
097                bucketCount += 1;
098            }
099        } finally {
100            res.close();
101            if (warnAtEnd || log.isDebugEnabled()) {
102                String message = String.format("Re-indexing job: %s has submited %d documents in %d bucket workers",
103                        jobName, documentCount, bucketCount);
104                if (warnAtEnd) {
105                    log.warn(message);
106                } else {
107                    log.debug(message);
108                }
109            }
110        }
111    }
112
113    protected void scheduleBucketWorker(List<String> bucket, boolean isLast) {
114        if (bucket.isEmpty()) {
115            return;
116        }
117        BucketIndexingWorker subWorker = new BucketIndexingWorker(repositoryName, bucket, isLast);
118        getWorkManager().schedule(subWorker);
119    }
120
121    protected WorkManager getWorkManager() {
122        if (workManager == null) {
123            workManager = Framework.getLocalService(WorkManager.class);
124        }
125        return workManager;
126    }
127
128    protected int getBucketSize() {
129        String value = Framework.getProperty(REINDEX_BUCKET_READ_PROPERTY, DEFAULT_BUCKET_SIZE);
130        return Integer.parseInt(value);
131    }
132
133}