001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo 018 */ 019 020package org.nuxeo.elasticsearch.work; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_WRITE_PROPERTY; 023 024import java.util.ArrayList; 025import java.util.List; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.core.api.CoreSession; 030import org.nuxeo.ecm.core.api.DocumentLocation; 031import org.nuxeo.ecm.core.api.DocumentModel; 032import org.nuxeo.ecm.core.query.sql.NXQL; 033import org.nuxeo.ecm.core.work.api.Work; 034import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 035import org.nuxeo.elasticsearch.commands.IndexingCommand; 036import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 037import org.nuxeo.elasticsearch.Timestamp; 038import org.nuxeo.runtime.api.Framework; 039import org.nuxeo.runtime.transaction.TransactionHelper; 040 041/** 042 * œ Worker to index a bucket of documents 043 * 044 * @since 7.1 045 */ 046public class BucketIndexingWorker extends BaseIndexingWorker implements Work { 047 private static final Log log = LogFactory.getLog(BucketIndexingWorker.class); 048 049 private static final long serialVersionUID = -4665673026513796882L; 050 051 private static final String DEFAULT_BUCKET_SIZE = "50"; 052 053 private final boolean warnAtEnd; 054 055 private final int documentCount; 056 057 public BucketIndexingWorker(String repositoryName, List<String> docIds, boolean warnAtEnd) { 058 setDocuments(repositoryName, docIds); 059 documentCount = docIds.size(); 060 this.warnAtEnd = warnAtEnd; 061 } 062 063 @Override 064 public String getTitle() { 065 return " ElasticSearch bucket indexer size " + documentCount; 066 } 067 068 @Override 069 protected void doWork() { 070 ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class); 071 openSystemSession(); 072 int bucketSize = Math.min(documentCount, getBucketSize()); 073 List<String> ids = new ArrayList<>(bucketSize); 074 for (DocumentLocation doc : getDocuments()) { 075 ids.add(doc.getIdRef().value); 076 if ((ids.size() % bucketSize) == 0) { 077 esi.indexNonRecursive(getIndexingCommands(session, ids)); 078 ids.clear(); 079 TransactionHelper.commitOrRollbackTransaction(); 080 TransactionHelper.startTransaction(); 081 } 082 } 083 if (!ids.isEmpty()) { 084 esi.indexNonRecursive(getIndexingCommands(session, ids)); 085 ids.clear(); 086 } 087 if (warnAtEnd) { 088 log.warn(String.format("Re-indexing job: %s completed.", getSchedulePath().getParentPath())); 089 } 090 } 091 092 private List<IndexingCommand> getIndexingCommands(CoreSession session, List<String> ids) { 093 List<IndexingCommand> ret = new ArrayList<>(ids.size()); 094 long now = Timestamp.currentTimeMicros(); 095 for (DocumentModel doc : fetchDocuments(session, ids)) { 096 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, false); 097 cmd.setOrder(now); 098 ret.add(cmd); 099 } 100 return ret; 101 } 102 103 private List<DocumentModel> fetchDocuments(CoreSession session, List<String> ids) { 104 StringBuilder sb = new StringBuilder(); 105 sb.append("SELECT * FROM Document, Relation WHERE ecm:uuid IN ("); 106 for (int i = 0; i < ids.size(); i++) { 107 sb.append(NXQL.escapeString(ids.get(i))); 108 if (i < ids.size() - 1) { 109 sb.append(", "); 110 } 111 } 112 sb.append(")"); 113 // read invalidation 114 session.save(); 115 return session.query(sb.toString()); 116 } 117 118 protected int getBucketSize() { 119 String value = Framework.getProperty(REINDEX_BUCKET_WRITE_PROPERTY, DEFAULT_BUCKET_SIZE); 120 return Integer.parseInt(value); 121 } 122 123}