001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Delbosc Benoit 018 */ 019 020package org.nuxeo.elasticsearch.work; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_READ_PROPERTY; 023 024import java.io.Serializable; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.ecm.core.api.IterableQueryResult; 033import org.nuxeo.ecm.core.query.sql.NXQL; 034import org.nuxeo.ecm.core.work.api.Work; 035import org.nuxeo.ecm.core.work.api.WorkManager; 036import org.nuxeo.runtime.api.Framework; 037 038/** 039 * Worker to reindex a large amount of document 040 * 041 * @since 7.1 042 */ 043public class ScrollingIndexingWorker extends BaseIndexingWorker implements Work { 044 private static final Log log = LogFactory.getLog(ScrollingIndexingWorker.class); 045 046 private static final long serialVersionUID = -4507677669419340384L; 047 048 private static final String DEFAULT_BUCKET_SIZE = "500"; 049 050 private static final long WARN_DOC_COUNT = 500; 051 052 protected final String nxql; 053 054 protected transient WorkManager workManager; 055 056 protected long documentCount = 0; 057 058 public ScrollingIndexingWorker(String repositoryName, String nxql) { 059 this.repositoryName = repositoryName; 060 this.nxql = nxql; 061 } 062 063 @Override 064 public String getTitle() { 065 return "Elasticsearch scrolling indexer: " + nxql + ", processed " + documentCount; 066 } 067 068 @Override 069 protected void doWork() { 070 String jobName = getSchedulePath().getPath(); 071 if (log.isDebugEnabled()) { 072 log.debug(String.format("Re-indexing job: %s started, NXQL: %s on repository: %s", jobName, nxql, 073 repositoryName)); 074 } 075 openSystemSession(); 076 IterableQueryResult res = session.queryAndFetch(nxql, NXQL.NXQL); 077 int bucketCount = 0; 078 boolean warnAtEnd = false; 079 try { 080 Iterator<Map<String, Serializable>> it = res.iterator(); 081 int bucketSize = getBucketSize(); 082 List<String> ids = new ArrayList<>(bucketSize); 083 while (it.hasNext()) { 084 documentCount += 1; 085 ids.add((String) it.next().get(NXQL.ECM_UUID)); 086 if (ids.size() == bucketSize) { 087 scheduleBucketWorker(ids, false); 088 ids = new ArrayList<>(bucketSize); 089 bucketCount += 1; 090 } 091 } 092 if (documentCount > WARN_DOC_COUNT) { 093 warnAtEnd = true; 094 } 095 scheduleBucketWorker(ids, warnAtEnd); 096 if (!ids.isEmpty()) { 097 bucketCount += 1; 098 } 099 } finally { 100 res.close(); 101 if (warnAtEnd || log.isDebugEnabled()) { 102 String message = String.format("Re-indexing job: %s has submited %d documents in %d bucket workers", 103 jobName, documentCount, bucketCount); 104 if (warnAtEnd) { 105 log.warn(message); 106 } else { 107 log.debug(message); 108 } 109 } 110 } 111 } 112 113 protected void scheduleBucketWorker(List<String> bucket, boolean isLast) { 114 if (bucket.isEmpty()) { 115 return; 116 } 117 BucketIndexingWorker subWorker = new BucketIndexingWorker(repositoryName, bucket, isLast); 118 getWorkManager().schedule(subWorker); 119 } 120 121 protected WorkManager getWorkManager() { 122 if (workManager == null) { 123 workManager = Framework.getLocalService(WorkManager.class); 124 } 125 return workManager; 126 } 127 128 protected int getBucketSize() { 129 String value = Framework.getProperty(REINDEX_BUCKET_READ_PROPERTY, DEFAULT_BUCKET_SIZE); 130 return Integer.parseInt(value); 131 } 132 133}