001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo 018 */ 019 020package org.nuxeo.elasticsearch.work; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_WRITE_PROPERTY; 023 024import java.util.ArrayList; 025import java.util.List; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.core.api.CoreSession; 030import org.nuxeo.ecm.core.api.DocumentLocation; 031import org.nuxeo.ecm.core.api.DocumentModel; 032import org.nuxeo.ecm.core.query.sql.NXQL; 033import org.nuxeo.ecm.core.work.api.Work; 034import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 035import org.nuxeo.elasticsearch.commands.IndexingCommand; 036import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 037import org.nuxeo.runtime.api.Framework; 038 039/** 040 * œ Worker to index a bucket of documents 041 * 042 * @since 7.1 043 */ 044public class BucketIndexingWorker extends BaseIndexingWorker implements Work { 045 private static final Log log = LogFactory.getLog(BucketIndexingWorker.class); 046 047 private static final long serialVersionUID = -4665673026513796882L; 048 049 private static final String DEFAULT_BUCKET_SIZE = "50"; 050 051 private final boolean warnAtEnd; 052 053 private final int documentCount; 054 055 public BucketIndexingWorker(String repositoryName, List<String> docIds, boolean warnAtEnd) { 056 setDocuments(repositoryName, docIds); 057 documentCount = docIds.size(); 058 this.warnAtEnd = warnAtEnd; 059 } 060 061 @Override 062 public String getTitle() { 063 return " ElasticSearch bucket indexer size " + documentCount; 064 } 065 066 @Override 067 protected void doWork() { 068 ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class); 069 openSystemSession(); 070 int bucketSize = Math.min(documentCount, getBucketSize()); 071 List<String> ids = new ArrayList<>(bucketSize); 072 for (DocumentLocation doc : getDocuments()) { 073 ids.add(doc.getIdRef().value); 074 if ((ids.size() % bucketSize) == 0) { 075 esi.indexNonRecursive(getIndexingCommands(session, ids)); 076 ids.clear(); 077 } 078 } 079 if (!ids.isEmpty()) { 080 esi.indexNonRecursive(getIndexingCommands(session, ids)); 081 ids.clear(); 082 } 083 if (warnAtEnd) { 084 log.warn(String.format("Re-indexing job: %s completed.", getSchedulePath().getParentPath())); 085 } 086 } 087 088 private List<IndexingCommand> getIndexingCommands(CoreSession session, List<String> ids) { 089 List<IndexingCommand> ret = new ArrayList<>(ids.size()); 090 for (DocumentModel doc : fetchDocuments(session, ids)) { 091 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, false); 092 ret.add(cmd); 093 } 094 return ret; 095 } 096 097 private List<DocumentModel> fetchDocuments(CoreSession session, List<String> ids) { 098 StringBuilder sb = new StringBuilder(); 099 sb.append("SELECT * FROM Document, Relation WHERE ecm:uuid IN ("); 100 for (int i = 0; i < ids.size(); i++) { 101 sb.append(NXQL.escapeString(ids.get(i))); 102 if (i < ids.size() - 1) { 103 sb.append(", "); 104 } 105 } 106 sb.append(")"); 107 // read invalidation 108 session.save(); 109 return session.query(sb.toString()); 110 } 111 112 protected int getBucketSize() { 113 String value = Framework.getProperty(REINDEX_BUCKET_WRITE_PROPERTY, DEFAULT_BUCKET_SIZE); 114 return Integer.parseInt(value); 115 } 116 117}