001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Nuxeo 016 */ 017 018package org.nuxeo.elasticsearch.work; 019 020import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_WRITE_PROPERTY; 021 022import java.util.ArrayList; 023import java.util.List; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.nuxeo.ecm.core.api.CoreSession; 028import org.nuxeo.ecm.core.api.DocumentLocation; 029import org.nuxeo.ecm.core.api.DocumentModel; 030import org.nuxeo.ecm.core.query.sql.NXQL; 031import org.nuxeo.ecm.core.work.api.Work; 032import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 033import org.nuxeo.elasticsearch.commands.IndexingCommand; 034import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 035import org.nuxeo.runtime.api.Framework; 036 037/** 038 * œ Worker to index a bucket of documents 039 * 040 * @since 7.1 041 */ 042public class BucketIndexingWorker extends BaseIndexingWorker implements Work { 043 private static final Log log = LogFactory.getLog(BucketIndexingWorker.class); 044 045 private static final long serialVersionUID = -4665673026513796882L; 046 047 private static final String DEFAULT_BUCKET_SIZE = "50"; 048 049 private final boolean warnAtEnd; 050 051 private final int documentCount; 052 053 public BucketIndexingWorker(String repositoryName, List<String> docIds, boolean warnAtEnd) { 054 setDocuments(repositoryName, docIds); 055 documentCount = docIds.size(); 056 this.warnAtEnd = warnAtEnd; 057 } 058 059 @Override 060 public String getTitle() { 061 return " ElasticSearch bucket indexer size " + documentCount; 062 } 063 064 @Override 065 protected void doWork() { 066 ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class); 067 CoreSession session = initSession(repositoryName); 068 int bucketSize = Math.min(documentCount, getBucketSize()); 069 List<String> ids = new ArrayList<>(bucketSize); 070 for (DocumentLocation doc : getDocuments()) { 071 ids.add(doc.getIdRef().value); 072 if ((ids.size() % bucketSize) == 0) { 073 esi.indexNonRecursive(getIndexingCommands(session, ids)); 074 ids.clear(); 075 } 076 } 077 if (!ids.isEmpty()) { 078 esi.indexNonRecursive(getIndexingCommands(session, ids)); 079 ids.clear(); 080 } 081 if (warnAtEnd) { 082 log.warn(String.format("Re-indexing job: %s completed.", getSchedulePath().getParentPath())); 083 } 084 } 085 086 private List<IndexingCommand> getIndexingCommands(CoreSession session, List<String> ids) { 087 List<IndexingCommand> ret = new ArrayList<>(ids.size()); 088 for (DocumentModel doc : fetchDocuments(session, ids)) { 089 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, false); 090 ret.add(cmd); 091 } 092 return ret; 093 } 094 095 private List<DocumentModel> fetchDocuments(CoreSession session, List<String> ids) { 096 StringBuilder sb = new StringBuilder(); 097 sb.append("SELECT * FROM Document, Relation WHERE ecm:uuid IN ("); 098 for (int i = 0; i < ids.size(); i++) { 099 sb.append(NXQL.escapeString(ids.get(i))); 100 if (i < ids.size() - 1) { 101 sb.append(", "); 102 } 103 } 104 sb.append(")"); 105 // read invalidation 106 session.save(); 107 return session.query(sb.toString()); 108 } 109 110 protected int getBucketSize() { 111 String value = Framework.getProperty(REINDEX_BUCKET_WRITE_PROPERTY, DEFAULT_BUCKET_SIZE); 112 return Integer.parseInt(value); 113 } 114 115}