001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo 018 */ 019 020package org.nuxeo.elasticsearch.work; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_WRITE_PROPERTY; 023 024import java.util.ArrayList; 025import java.util.List; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.core.api.CoreSession; 030import org.nuxeo.ecm.core.api.DocumentLocation; 031import org.nuxeo.ecm.core.api.DocumentModel; 032import org.nuxeo.ecm.core.query.sql.NXQL; 033import org.nuxeo.ecm.core.work.api.Work; 034import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 035import org.nuxeo.elasticsearch.commands.IndexingCommand; 036import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 037import org.nuxeo.elasticsearch.Timestamp; 038import org.nuxeo.runtime.api.Framework; 039 040/** 041 * œ Worker to index a bucket of documents 042 * 043 * @since 7.1 044 */ 045public class BucketIndexingWorker extends BaseIndexingWorker implements Work { 046 private static final Log log = LogFactory.getLog(BucketIndexingWorker.class); 047 048 private static final long serialVersionUID = -4665673026513796882L; 049 050 private static final String DEFAULT_BUCKET_SIZE = "50"; 051 052 private final boolean warnAtEnd; 053 054 private final int documentCount; 055 056 public BucketIndexingWorker(String repositoryName, List<String> docIds, boolean warnAtEnd) { 057 setDocuments(repositoryName, docIds); 058 documentCount = docIds.size(); 059 this.warnAtEnd = warnAtEnd; 060 } 061 062 @Override 063 public String getTitle() { 064 return " ElasticSearch bucket indexer size " + documentCount; 065 } 066 067 @Override 068 protected void doWork() { 069 ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class); 070 openSystemSession(); 071 int bucketSize = Math.min(documentCount, getBucketSize()); 072 List<String> ids = new ArrayList<>(bucketSize); 073 for (DocumentLocation doc : getDocuments()) { 074 ids.add(doc.getIdRef().value); 075 if ((ids.size() % bucketSize) == 0) { 076 esi.indexNonRecursive(getIndexingCommands(session, ids)); 077 ids.clear(); 078 } 079 } 080 if (!ids.isEmpty()) { 081 esi.indexNonRecursive(getIndexingCommands(session, ids)); 082 ids.clear(); 083 } 084 if (warnAtEnd) { 085 log.warn(String.format("Re-indexing job: %s completed.", getSchedulePath().getParentPath())); 086 } 087 } 088 089 private List<IndexingCommand> getIndexingCommands(CoreSession session, List<String> ids) { 090 List<IndexingCommand> ret = new ArrayList<>(ids.size()); 091 long now = Timestamp.currentTimeMicros(); 092 for (DocumentModel doc : fetchDocuments(session, ids)) { 093 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, false); 094 cmd.setOrder(now); 095 ret.add(cmd); 096 } 097 return ret; 098 } 099 100 private List<DocumentModel> fetchDocuments(CoreSession session, List<String> ids) { 101 StringBuilder sb = new StringBuilder(); 102 sb.append("SELECT * FROM Document, Relation WHERE ecm:uuid IN ("); 103 for (int i = 0; i < ids.size(); i++) { 104 sb.append(NXQL.escapeString(ids.get(i))); 105 if (i < ids.size() - 1) { 106 sb.append(", "); 107 } 108 } 109 sb.append(")"); 110 // read invalidation 111 session.save(); 112 return session.query(sb.toString()); 113 } 114 115 protected int getBucketSize() { 116 String value = Framework.getProperty(REINDEX_BUCKET_WRITE_PROPERTY, DEFAULT_BUCKET_SIZE); 117 return Integer.parseInt(value); 118 } 119 120}