001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo 018 */ 019 020package org.nuxeo.elasticsearch.work; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_WRITE_PROPERTY; 023 024import java.util.ArrayList; 025import java.util.List; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.core.api.CoreSession; 030import org.nuxeo.ecm.core.api.DocumentLocation; 031import org.nuxeo.ecm.core.api.DocumentModel; 032import org.nuxeo.ecm.core.query.sql.NXQL; 033import org.nuxeo.ecm.core.work.api.Work; 034import org.nuxeo.elasticsearch.Timestamp; 035import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 036import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 037import org.nuxeo.elasticsearch.commands.IndexingCommand; 038import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 039import org.nuxeo.runtime.api.Framework; 040import org.nuxeo.runtime.transaction.TransactionHelper; 041 042/** 043 * œ Worker to index a bucket of documents 044 * 045 * @since 7.1 046 */ 047public class BucketIndexingWorker extends BaseIndexingWorker implements Work { 048 private static final Log log = LogFactory.getLog(BucketIndexingWorker.class); 049 050 private static final long serialVersionUID = -4665673026513796882L; 051 052 private static final String DEFAULT_BUCKET_SIZE = "50"; 053 054 private final boolean syncAlias; 055 056 private final int documentCount; 057 058 public BucketIndexingWorker(String repositoryName, List<String> docIds, boolean syncAlias) { 059 setDocuments(repositoryName, docIds); 060 documentCount = docIds.size(); 061 this.syncAlias = syncAlias; 062 } 063 064 @Override 065 public String getTitle() { 066 return " ElasticSearch bucket indexer size " + documentCount; 067 } 068 069 @Override 070 protected void doWork() { 071 ElasticSearchIndexing esi = Framework.getService(ElasticSearchIndexing.class); 072 openSystemSession(); 073 int bucketSize = Math.min(documentCount, getBucketSize()); 074 List<String> ids = new ArrayList<>(bucketSize); 075 for (DocumentLocation doc : getDocuments()) { 076 ids.add(doc.getIdRef().value); 077 if ((ids.size() % bucketSize) == 0) { 078 esi.indexNonRecursive(getIndexingCommands(session, ids)); 079 ids.clear(); 080 TransactionHelper.commitOrRollbackTransaction(); 081 TransactionHelper.startTransaction(); 082 } 083 } 084 if (!ids.isEmpty()) { 085 esi.indexNonRecursive(getIndexingCommands(session, ids)); 086 ids.clear(); 087 } 088 if (syncAlias) { 089 log.warn(String.format("Re-indexing job: %s completed.", getSchedulePath().getParentPath())); 090 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 091 esa.syncSearchAndWriteAlias(esa.getIndexNameForRepository(repositoryName)); 092 } 093 } 094 095 private List<IndexingCommand> getIndexingCommands(CoreSession session, List<String> ids) { 096 List<IndexingCommand> ret = new ArrayList<>(ids.size()); 097 long now = Timestamp.currentTimeMicros(); 098 for (DocumentModel doc : fetchDocuments(session, ids)) { 099 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, false); 100 cmd.setOrder(now); 101 ret.add(cmd); 102 } 103 return ret; 104 } 105 106 private List<DocumentModel> fetchDocuments(CoreSession session, List<String> ids) { 107 StringBuilder sb = new StringBuilder(); 108 sb.append("SELECT * FROM Document, Relation WHERE ecm:uuid IN ("); 109 for (int i = 0; i < ids.size(); i++) { 110 sb.append(NXQL.escapeString(ids.get(i))); 111 if (i < ids.size() - 1) { 112 sb.append(", "); 113 } 114 } 115 sb.append(")"); 116 // read invalidation 117 session.save(); 118 return session.query(sb.toString()); 119 } 120 121 protected int getBucketSize() { 122 String value = Framework.getProperty(REINDEX_BUCKET_WRITE_PROPERTY, DEFAULT_BUCKET_SIZE); 123 return Integer.parseInt(value); 124 } 125 126}