001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core; 020 021import java.util.List; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025import org.nuxeo.ecm.core.api.ScrollResult; 026import org.nuxeo.ecm.core.work.AbstractWork; 027import org.nuxeo.ecm.core.work.api.Work; 028import org.nuxeo.ecm.core.work.api.WorkManager; 029import org.nuxeo.runtime.api.Framework; 030import org.nuxeo.runtime.transaction.TransactionHelper; 031 032/** 033 * Abstract Work to find the ids of documents for which some process must be executed in batch, based on a NXQL query. 034 * 035 * @since 9.10 036 */ 037public abstract class BatchFinderWork extends AbstractWork { 038 039 private static final long serialVersionUID = 1L; 040 041 private static final Log log = LogFactory.getLog(BatchFinderWork.class); 042 043 protected static final int SCROLL_KEEPALIVE_SECONDS = 60; 044 045 protected String nxql; 046 047 public BatchFinderWork(String repositoryName, String nxql, String originatingUsername) { 048 this.repositoryName = repositoryName; 049 this.nxql = nxql; 050 setOriginatingUsername(originatingUsername); 051 } 052 053 @Override 054 public int getRetryCount() { 055 // even read-only threads may encounter concurrent update exceptions when trying to read 056 // a previously deleted complex property due to read committed semantics (see NXP-17384) 057 return 1; 058 } 059 060 /** 061 * The batch size to use. 062 */ 063 public abstract int getBatchSize(); 064 065 @Override 066 public void work() { 067 int batchSize = getBatchSize(); 068 if (log.isDebugEnabled()) { 069 log.debug(getTitle() + ": Starting batch find for query: " + nxql + " with batch size: " + batchSize); 070 } 071 openSystemSession(); 072 setProgress(Progress.PROGRESS_INDETERMINATE); 073 setStatus("Searching"); 074 075 long batchCount = 0; 076 long documentCount = 0; 077 ScrollResult<String> scroll = session.scroll(nxql, batchSize, SCROLL_KEEPALIVE_SECONDS); 078 while (scroll.hasResults()) { 079 List<String> docIds = scroll.getResults(); 080 // schedule the batch 081 if (!docIds.isEmpty()) { 082 Framework.getService(WorkManager.class).schedule(getBatchProcessorWork(docIds)); 083 } 084 batchCount += 1; 085 documentCount += docIds.size(); 086 setProgress(new Progress(documentCount, -1)); 087 // next batch 088 scroll = session.scroll(scroll.getScrollId()); 089 TransactionHelper.commitOrRollbackTransaction(); 090 TransactionHelper.startTransaction(); 091 } 092 093 if (log.isDebugEnabled()) { 094 log.debug(getTitle() + ": Submitted " + documentCount + " documents in " + batchCount 095 + " batch processor workers"); 096 } 097 setProgress(new Progress(documentCount, documentCount)); 098 setStatus("Done"); 099 } 100 101 public abstract Work getBatchProcessorWork(List<String> docIds); 102 103}