001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Delbosc Benoit 016 */ 017 018package org.nuxeo.elasticsearch.work; 019 020import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_READ_PROPERTY; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.ecm.core.api.CoreSession; 031import org.nuxeo.ecm.core.api.IterableQueryResult; 032import org.nuxeo.ecm.core.query.sql.NXQL; 033import org.nuxeo.ecm.core.work.api.Work; 034import org.nuxeo.ecm.core.work.api.WorkManager; 035import org.nuxeo.runtime.api.Framework; 036 037/** 038 * Worker to reindex a large amount of document 039 * 040 * @since 7.1 041 */ 042public class ScrollingIndexingWorker extends BaseIndexingWorker implements Work { 043 private static final Log log = LogFactory.getLog(ScrollingIndexingWorker.class); 044 045 private static final long serialVersionUID = -4507677669419340384L; 046 047 private static final String DEFAULT_BUCKET_SIZE = "500"; 048 049 private static final long WARN_DOC_COUNT = 500; 050 051 protected final String nxql; 052 053 protected transient WorkManager workManager; 054 055 protected long documentCount = 0; 056 057 public ScrollingIndexingWorker(String repositoryName, String nxql) { 058 this.repositoryName = repositoryName; 059 this.nxql = nxql; 060 } 061 062 @Override 063 public String getTitle() { 064 return "Elasticsearch scrolling indexer: " + nxql + ", processed " + documentCount; 065 } 066 067 @Override 068 protected void doWork() { 069 String jobName = getSchedulePath().getPath(); 070 if (log.isDebugEnabled()) { 071 log.debug(String.format("Re-indexing job: %s started, NXQL: %s on repository: %s", jobName, nxql, 072 repositoryName)); 073 } 074 CoreSession session = initSession(repositoryName); 075 IterableQueryResult res = session.queryAndFetch(nxql, NXQL.NXQL); 076 int bucketCount = 0; 077 boolean warnAtEnd = false; 078 try { 079 Iterator<Map<String, Serializable>> it = res.iterator(); 080 int bucketSize = getBucketSize(); 081 List<String> ids = new ArrayList<>(bucketSize); 082 while (it.hasNext()) { 083 documentCount += 1; 084 ids.add((String) it.next().get(NXQL.ECM_UUID)); 085 if (ids.size() == bucketSize) { 086 scheduleBucketWorker(ids, false); 087 ids = new ArrayList<>(bucketSize); 088 bucketCount += 1; 089 } 090 } 091 if (documentCount > WARN_DOC_COUNT) { 092 warnAtEnd = true; 093 } 094 scheduleBucketWorker(ids, warnAtEnd); 095 if (!ids.isEmpty()) { 096 bucketCount += 1; 097 } 098 } finally { 099 res.close(); 100 if (warnAtEnd || log.isDebugEnabled()) { 101 String message = String.format("Re-indexing job: %s has submited %d documents in %d bucket workers", 102 jobName, documentCount, bucketCount); 103 if (warnAtEnd) { 104 log.warn(message); 105 } else { 106 log.debug(message); 107 } 108 } 109 } 110 } 111 112 protected void scheduleBucketWorker(List<String> bucket, boolean isLast) { 113 if (bucket.isEmpty()) { 114 return; 115 } 116 BucketIndexingWorker subWorker = new BucketIndexingWorker(repositoryName, bucket, isLast); 117 getWorkManager().schedule(subWorker); 118 } 119 120 protected WorkManager getWorkManager() { 121 if (workManager == null) { 122 workManager = Framework.getLocalService(WorkManager.class); 123 } 124 return workManager; 125 } 126 127 protected int getBucketSize() { 128 String value = Framework.getProperty(REINDEX_BUCKET_READ_PROPERTY, DEFAULT_BUCKET_SIZE); 129 return Integer.parseInt(value); 130 } 131 132}