001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Delbosc Benoit
018 */
019
020package org.nuxeo.elasticsearch.work;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_READ_PROPERTY;
023
024import java.util.Collections;
025import java.util.List;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.ecm.core.api.ScrollResult;
030import org.nuxeo.ecm.core.work.api.Work;
031import org.nuxeo.ecm.core.work.api.WorkManager;
032import org.nuxeo.runtime.api.Framework;
033import org.nuxeo.runtime.transaction.TransactionHelper;
034
035/**
036 * Worker to reindex a large amount of document
037 *
038 * @since 7.1
039 */
040public class ScrollingIndexingWorker extends BaseIndexingWorker implements Work {
041    private static final Log log = LogFactory.getLog(ScrollingIndexingWorker.class);
042
043    private static final long serialVersionUID = -4507677669419340384L;
044
045    private static final String DEFAULT_BUCKET_SIZE = "500";
046
047    private static final long WARN_DOC_COUNT = 500;
048
049    protected final String nxql;
050
051    protected final boolean syncAlias;
052
053    protected transient WorkManager workManager;
054
055    protected long documentCount;
056
057    public ScrollingIndexingWorker(String repositoryName, String nxql) {
058        this(repositoryName, nxql, false);
059    }
060
061    public ScrollingIndexingWorker(String repositoryName, String nxql, boolean syncAlias) {
062        this.repositoryName = repositoryName;
063        this.nxql = nxql;
064        this.syncAlias = syncAlias;
065    }
066
067    @Override
068    public String getTitle() {
069        return "Elasticsearch scrolling indexer: " + nxql + ", processed " + documentCount;
070    }
071
072    @Override
073    protected void doWork() {
074        String jobName = getSchedulePath().getPath();
075        if (log.isDebugEnabled()) {
076            log.debug(String.format("Re-indexing job: %s started, NXQL: %s on repository: %s", jobName, nxql,
077                    repositoryName));
078        }
079        openSystemSession();
080        int bucketSize = getBucketSize();
081        ScrollResult<String> ret = session.scroll(nxql, bucketSize, 60);
082        int bucketCount = 0;
083        try {
084            while (ret.hasResults()) {
085                documentCount += ret.getResults().size();
086                scheduleBucketWorker(ret.getResults(), false);
087                bucketCount += 1;
088                ret = session.scroll(ret.getScrollId());
089                TransactionHelper.commitOrRollbackTransaction();
090                TransactionHelper.startTransaction();
091            }
092            if (syncAlias) {
093                scheduleBucketWorker(Collections.emptyList(), true);
094            }
095        } finally {
096            if (syncAlias || documentCount > WARN_DOC_COUNT) {
097                String message = String.format("Re-indexing job: %s has submited %d documents in %d bucket workers",
098                        jobName, documentCount, bucketCount);
099                if (syncAlias) {
100                    log.warn(message);
101                } else {
102                    log.debug(message);
103                }
104            }
105        }
106    }
107
108    protected void scheduleBucketWorker(List<String> bucket, boolean syncAlias) {
109        if (bucket.isEmpty() && !syncAlias) {
110            return;
111        }
112        BucketIndexingWorker subWorker = new BucketIndexingWorker(repositoryName, bucket, syncAlias);
113        getWorkManager().schedule(subWorker);
114    }
115
116    protected WorkManager getWorkManager() {
117        if (workManager == null) {
118            workManager = Framework.getService(WorkManager.class);
119        }
120        return workManager;
121    }
122
123    protected int getBucketSize() {
124        String value = Framework.getProperty(REINDEX_BUCKET_READ_PROPERTY, DEFAULT_BUCKET_SIZE);
125        return Integer.parseInt(value);
126    }
127
128}