001/* 002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD; 025import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 026import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEX_BULK_MAX_SIZE_PROPERTY; 027import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD; 028 029import java.io.IOException; 030import java.io.OutputStream; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Set; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.codehaus.jackson.JsonFactory; 038import org.codehaus.jackson.JsonGenerator; 039import org.elasticsearch.action.bulk.BulkItemResponse; 040import org.elasticsearch.action.bulk.BulkRequestBuilder; 041import org.elasticsearch.action.bulk.BulkResponse; 042import org.elasticsearch.action.delete.DeleteRequestBuilder; 043import org.elasticsearch.action.get.GetRequestBuilder; 044import org.elasticsearch.action.get.GetResponse; 045import org.elasticsearch.action.index.IndexRequestBuilder; 046import org.elasticsearch.action.search.SearchRequestBuilder; 047import org.elasticsearch.action.search.SearchResponse; 048import org.elasticsearch.common.io.stream.BytesStreamOutput; 049import org.elasticsearch.common.unit.TimeValue; 050import org.elasticsearch.index.VersionType; 051import org.elasticsearch.index.engine.VersionConflictEngineException; 052import org.elasticsearch.index.query.QueryBuilder; 053import org.elasticsearch.index.query.QueryBuilders; 054import org.elasticsearch.rest.RestStatus; 055import org.elasticsearch.search.SearchHit; 056import org.nuxeo.common.logging.SequenceTracer; 057import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 058import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 059import org.nuxeo.ecm.core.api.DocumentModel; 060import org.nuxeo.ecm.core.api.DocumentNotFoundException; 061import org.nuxeo.ecm.core.api.NuxeoException; 062import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 063import org.nuxeo.elasticsearch.commands.IndexingCommand; 064import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 065import org.nuxeo.runtime.api.Framework; 066import org.nuxeo.runtime.metrics.MetricsService; 067 068import com.codahale.metrics.MetricRegistry; 069import com.codahale.metrics.SharedMetricRegistries; 070import com.codahale.metrics.Timer; 071import com.codahale.metrics.Timer.Context; 072 073/** 074 * @since 6.0 075 */ 076public class ElasticSearchIndexingImpl implements ElasticSearchIndexing { 077 private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class); 078 079 // debug curl line max size 080 private static final int MAX_CURL_LINE = 8 * 1024; 081 082 // send the bulk indexing command when this size is reached, optimal is 5-10m 083 private static final int DEFAULT_MAX_BULK_SIZE = 5 * 1024 * 1024; 084 085 private final ElasticSearchAdminImpl esa; 086 087 private final Timer deleteTimer; 088 089 private final Timer indexTimer; 090 091 private final Timer bulkIndexTimer; 092 093 private final boolean useExternalVersion; 094 095 private JsonESDocumentWriter jsonESDocumentWriter; 096 097 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) { 098 this.esa = esa; 099 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 100 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 101 deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete")); 102 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 103 this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer 104 this.useExternalVersion = esa.useExternalVersion(); 105 } 106 107 /** 108 * @since 7.2 109 */ 110 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) { 111 this(esa); 112 this.jsonESDocumentWriter = jsonESDocumentWriter; 113 } 114 115 @Override 116 public void runIndexingWorker(List<IndexingCommand> cmds) { 117 throw new UnsupportedOperationException("Not implemented"); 118 } 119 120 @Override 121 public void runReindexingWorker(String repositoryName, String nxql) { 122 throw new UnsupportedOperationException("Not implemented"); 123 } 124 125 @Override 126 public void indexNonRecursive(List<IndexingCommand> cmds) { 127 int nbCommands = cmds.size(); 128 if (nbCommands == 1) { 129 indexNonRecursive(cmds.get(0)); 130 return; 131 } 132 // simulate long indexing 133 // try {Thread.sleep(1000);} catch (InterruptedException e) { } 134 135 processBulkDeleteCommands(cmds); 136 try (Context ignored = bulkIndexTimer.time()) { 137 processBulkIndexCommands(cmds); 138 } 139 esa.totalCommandProcessed.addAndGet(nbCommands); 140 refreshIfNeeded(cmds); 141 } 142 143 void processBulkDeleteCommands(List<IndexingCommand> cmds) { 144 // Can be optimized with a single delete by query 145 for (IndexingCommand cmd : cmds) { 146 if (cmd.getType() == Type.DELETE) { 147 try (Context ignored = deleteTimer.time()){ 148 processDeleteCommand(cmd); 149 } 150 } 151 } 152 } 153 154 void processBulkIndexCommands(List<IndexingCommand> cmds) { 155 BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk(); 156 Set<String> docIds = new HashSet<>(cmds.size()); 157 int bulkSize = 0; 158 final int maxBulkSize = getMaxBulkSize(); 159 for (IndexingCommand cmd : cmds) { 160 if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) { 161 continue; 162 } 163 if (!docIds.add(cmd.getTargetDocumentId())) { 164 // do not submit the same doc 2 times 165 continue; 166 } 167 try { 168 IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd); 169 if (idxRequest != null) { 170 bulkSize += idxRequest.request().source().length(); 171 bulkRequest.add(idxRequest); 172 } 173 } catch (ConcurrentUpdateException e) { 174 throw e; // bubble up, usually until AbstractWork catches it and maybe retries 175 } catch (DocumentNotFoundException e) { 176 log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd); 177 } catch (IllegalArgumentException e) { 178 log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e); 179 } 180 if (bulkSize > maxBulkSize) { 181 log.warn("Max bulk size reached " + bulkSize + ", sending bulk command"); 182 sendBulkCommand(bulkRequest, bulkSize); 183 bulkRequest = esa.getClient().prepareBulk(); 184 bulkSize = 0; 185 } 186 } 187 sendBulkCommand(bulkRequest, bulkSize); 188 } 189 190 int getMaxBulkSize() { 191 String value = Framework.getProperty(INDEX_BULK_MAX_SIZE_PROPERTY, String.valueOf(DEFAULT_MAX_BULK_SIZE)); 192 return Integer.parseInt(value); 193 } 194 195 void sendBulkCommand(BulkRequestBuilder bulkRequest, int bulkSize) { 196 if (bulkRequest.numberOfActions() > 0) { 197 if (log.isDebugEnabled()) { 198 logDebugMessageTruncated(String.format( 199 "Index %d docs (%d bytes) in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'", 200 bulkRequest.numberOfActions(), bulkSize, 201 bulkRequest.request().requests().toString()), MAX_CURL_LINE); 202 } 203 BulkResponse response = bulkRequest.execute().actionGet(); 204 if (response.hasFailures()) { 205 logBulkFailure(response); 206 } 207 } 208 } 209 210 void logBulkFailure(BulkResponse response) { 211 boolean isError = false; 212 StringBuilder sb = new StringBuilder(); 213 sb.append("Ignore indexing of some docs more recent versions has already been indexed"); 214 for (BulkItemResponse item : response.getItems()) { 215 if (item.isFailed()) { 216 if (item.getFailure().getStatus() == RestStatus.CONFLICT) { 217 sb.append("\n ").append(item.getFailureMessage()); 218 } else { 219 isError = true; 220 } 221 } 222 } 223 if (isError) { 224 log.error(response.buildFailureMessage()); 225 } else { 226 log.debug(sb); 227 } 228 } 229 230 void refreshIfNeeded(List<IndexingCommand> cmds) { 231 for (IndexingCommand cmd : cmds) { 232 if (refreshIfNeeded(cmd)) 233 return; 234 } 235 } 236 237 boolean refreshIfNeeded(IndexingCommand cmd) { 238 if (cmd.isSync()) { 239 esa.refresh(); 240 return true; 241 } 242 return false; 243 } 244 245 @Override 246 public void indexNonRecursive(IndexingCommand cmd) { 247 Type type = cmd.getType(); 248 if (type == Type.UPDATE_DIRECT_CHILDREN) { 249 // the parent don't need to be indexed 250 return; 251 } 252 if (type == Type.DELETE) { 253 try (Context ignored = deleteTimer.time()) { 254 processDeleteCommand(cmd); 255 } 256 } else { 257 try (Context ignored = indexTimer.time()) { 258 processIndexCommand(cmd); 259 } 260 } 261 refreshIfNeeded(cmd); 262 esa.totalCommandProcessed.incrementAndGet(); 263 } 264 265 void processIndexCommand(IndexingCommand cmd) { 266 IndexRequestBuilder request; 267 try { 268 request = buildEsIndexingRequest(cmd); 269 } catch (DocumentNotFoundException e) { 270 request = null; 271 } catch (IllegalStateException e) { 272 log.error("Fail to create request for indexing command: " + cmd, e); 273 return; 274 } 275 if (request == null) { 276 log.info("Cancel indexing command because target document does not exists anymore: " + cmd); 277 return; 278 } 279 if (log.isDebugEnabled()) { 280 logDebugMessageTruncated(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'", 281 esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(), 282 request.request().toString()), MAX_CURL_LINE); 283 } 284 try { 285 request.execute().actionGet(); 286 } catch (VersionConflictEngineException e) { 287 SequenceTracer.addNote("Ignore indexing of doc " + cmd.getTargetDocumentId()); 288 log.info("Ignore indexing of doc " + cmd.getTargetDocumentId() 289 + " a more recent version has already been indexed: " + e.getMessage()); 290 } 291 } 292 293 void logDebugMessageTruncated(String msg, int maxSize) { 294 if (log.isTraceEnabled() || msg.length() < maxSize) { 295 // in trace mode we output the full message 296 log.debug(msg); 297 } else { 298 log.debug(msg.substring(0, maxSize) + "..."); 299 } 300 } 301 302 void processDeleteCommand(IndexingCommand cmd) { 303 if (cmd.isRecurse()) { 304 processDeleteCommandRecursive(cmd); 305 } else { 306 processDeleteCommandNonRecursive(cmd); 307 } 308 } 309 310 void processDeleteCommandNonRecursive(IndexingCommand cmd) { 311 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 312 DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId()); 313 if (log.isDebugEnabled()) { 314 log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName, 315 DOC_TYPE, cmd.getTargetDocumentId())); 316 } 317 request.execute().actionGet(); 318 } 319 320 void processDeleteCommandRecursive(IndexingCommand cmd) { 321 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 322 // we don't want to rely on target document because the document can be 323 // already removed 324 String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId()); 325 if (docPath == null) { 326 if (!Framework.isTestModeSet()) { 327 log.warn("Trying to delete a non existing doc: " + cmd.toString()); 328 } 329 return; 330 } 331 // Refresh index before bulk delete 332 esa.getClient().admin().indices().prepareRefresh(indexName).get(); 333 334 // Run the scroll query 335 QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(CHILDREN_FIELD, docPath)); 336 TimeValue keepAlive = TimeValue.timeValueMinutes(1); 337 SearchRequestBuilder request = esa.getClient() 338 .prepareSearch(indexName) 339 .setTypes(DOC_TYPE) 340 .setScroll(keepAlive) 341 .setSize(100) 342 .setFetchSource(false) 343 .setQuery(query); 344 if (log.isDebugEnabled()) { 345 log.debug(String.format( 346 "Search with scroll request: curl -XGET 'http://localhost:9200/%s/%s/_search?scroll=%s' -d '%s'", 347 indexName, DOC_TYPE, keepAlive, query.toString())); 348 } 349 for (SearchResponse response = request.execute().actionGet(); // 350 response.getHits().getHits().length > 0; // 351 response = runNextScroll(response, keepAlive)) { 352 353 // Build bulk delete request 354 BulkRequestBuilder bulkBuilder = esa.getClient().prepareBulk(); 355 for (SearchHit hit : response.getHits().getHits()) { 356 bulkBuilder.add(esa.getClient().prepareDelete(hit.getIndex(), hit.getType(), hit.getId())); 357 } 358 if (log.isDebugEnabled()) { 359 log.debug(String.format("Bulk delete request on %s elements", bulkBuilder.numberOfActions())); 360 } 361 // Run bulk delete request 362 bulkBuilder.execute().actionGet(); 363 } 364 } 365 366 SearchResponse runNextScroll(SearchResponse response, TimeValue keepAlive) { 367 if (log.isDebugEnabled()) { 368 log.debug(String.format( 369 "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'", 370 keepAlive, response.getScrollId())); 371 } 372 return esa.getClient().prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet(); 373 } 374 375 /** 376 * Return the ecm:path of an ES document or null if not found. 377 */ 378 String getPathOfDocFromEs(String repository, String docId) { 379 String indexName = esa.getIndexNameForRepository(repository); 380 GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD); 381 if (log.isDebugEnabled()) { 382 log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", indexName, 383 DOC_TYPE, docId, PATH_FIELD)); 384 } 385 GetResponse ret = getRequest.execute().actionGet(); 386 if (!ret.isExists() || ret.getField(PATH_FIELD) == null) { 387 // doc not found 388 return null; 389 } 390 return ret.getField(PATH_FIELD).getValue().toString(); 391 } 392 393 /** 394 * Return indexing request or null if the doc does not exists anymore. 395 * 396 * @throws java.lang.IllegalStateException if the command is not attached to a session 397 */ 398 IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) { 399 DocumentModel doc = cmd.getTargetDocument(); 400 if (doc == null) { 401 return null; 402 } 403 try { 404 JsonFactory factory = new JsonFactory(); 405 OutputStream out = new BytesStreamOutput(); 406 JsonGenerator jsonGen = factory.createJsonGenerator(out); 407 jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null); 408 IndexRequestBuilder ret = esa.getClient() 409 .prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, 410 cmd.getTargetDocumentId()) 411 .setSource(jsonBuilder(out)); 412 if (useExternalVersion && cmd.getOrder() > 0) { 413 ret.setVersionType(VersionType.EXTERNAL).setVersion(cmd.getOrder()); 414 } 415 return ret; 416 } catch (IOException e) { 417 throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e); 418 } 419 } 420 421}