001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core;
020
021import java.util.List;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.nuxeo.ecm.core.work.AbstractWork;
026import org.nuxeo.runtime.transaction.TransactionHelper;
027
028/**
029 * Abstract Work to process a list of documents.
030 *
031 * @since 9.10
032 */
033public abstract class BatchProcessorWork extends AbstractWork {
034
035    private static final long serialVersionUID = 1L;
036
037    private static final Log log = LogFactory.getLog(BatchProcessorWork.class);
038
039    public BatchProcessorWork(String repositoryName, List<String> docIds, String originatingUsername) {
040        setDocuments(repositoryName, docIds);
041        setOriginatingUsername(originatingUsername);
042    }
043
044    @Override
045    public int getRetryCount() {
046        // even read-only threads may encounter concurrent update exceptions when trying to read
047        // a previously deleted complex property due to read committed semantics (see NXP-17384)
048        return 1;
049    }
050
051    /**
052     * The batch size to use.
053     */
054    public abstract int getBatchSize();
055
056    @Override
057    public void work() {
058        int size = docIds.size();
059        int batchSize = getBatchSize();
060        if (log.isDebugEnabled()) {
061            log.debug(getTitle() + ": Starting processing: " + size + " documents with batch size: " + batchSize);
062        }
063        openSystemSession();
064        setProgress(new Progress(0, size));
065        setStatus("Processing");
066
067        for (int start = 0; start < size; start += batchSize) {
068            int end = start + batchSize;
069            if (end > size) {
070                end = size;
071            }
072            List<String> batch = docIds.subList(start, end);
073            // process the batch
074            processBatch(batch);
075            setProgress(new Progress(end, size));
076            // next batch
077            TransactionHelper.commitOrRollbackTransaction();
078            TransactionHelper.startTransaction();
079        }
080
081        if (log.isDebugEnabled()) {
082            log.debug(getTitle() + ": Finished processing for batch of size:" + size);
083        }
084        setStatus("Done");
085    }
086
087    public abstract void processBatch(List<String> docIds);
088
089}