001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core;
020
021import java.util.List;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.nuxeo.ecm.core.api.ScrollResult;
026import org.nuxeo.ecm.core.work.AbstractWork;
027import org.nuxeo.ecm.core.work.api.Work;
028import org.nuxeo.ecm.core.work.api.WorkManager;
029import org.nuxeo.runtime.api.Framework;
030import org.nuxeo.runtime.transaction.TransactionHelper;
031
032/**
033 * Abstract Work to find the ids of documents for which some process must be executed in batch, based on a NXQL query.
034 *
035 * @since 9.10
036 */
037public abstract class BatchFinderWork extends AbstractWork {
038
039    private static final long serialVersionUID = 1L;
040
041    private static final Log log = LogFactory.getLog(BatchFinderWork.class);
042
043    protected static final int SCROLL_KEEPALIVE_SECONDS = 60;
044
045    protected String nxql;
046
047    public BatchFinderWork(String repositoryName, String nxql, String originatingUsername) {
048        this.repositoryName = repositoryName;
049        this.nxql = nxql;
050        setOriginatingUsername(originatingUsername);
051    }
052
053    @Override
054    public int getRetryCount() {
055        // even read-only threads may encounter concurrent update exceptions when trying to read
056        // a previously deleted complex property due to read committed semantics (see NXP-17384)
057        return 1;
058    }
059
060    /**
061     * The batch size to use.
062     */
063    public abstract int getBatchSize();
064
065    @Override
066    public void work() {
067        int batchSize = getBatchSize();
068        if (log.isDebugEnabled()) {
069            log.debug(getTitle() + ": Starting batch find for query: " + nxql + " with batch size: " + batchSize);
070        }
071        openSystemSession();
072        setProgress(Progress.PROGRESS_INDETERMINATE);
073        setStatus("Searching");
074
075        long batchCount = 0;
076        long documentCount = 0;
077        ScrollResult<String> scroll = session.scroll(nxql, batchSize, SCROLL_KEEPALIVE_SECONDS);
078        while (scroll.hasResults()) {
079            List<String> docIds = scroll.getResults();
080            // schedule the batch
081            if (!docIds.isEmpty()) {
082                Framework.getService(WorkManager.class).schedule(getBatchProcessorWork(docIds));
083            }
084            batchCount += 1;
085            documentCount += docIds.size();
086            setProgress(new Progress(documentCount, -1));
087            // next batch
088            scroll = session.scroll(scroll.getScrollId());
089            TransactionHelper.commitOrRollbackTransaction();
090            TransactionHelper.startTransaction();
091        }
092
093        if (log.isDebugEnabled()) {
094            log.debug(getTitle() + ": Submitted " + documentCount + " documents in " + batchCount
095                    + " batch processor workers");
096        }
097        setProgress(new Progress(documentCount, documentCount));
098        setStatus("Done");
099    }
100
101    public abstract Work getBatchProcessorWork(List<String> docIds);
102
103}