001/* 002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD; 025import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 026import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEX_BULK_MAX_SIZE_PROPERTY; 027import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD; 028 029import java.io.IOException; 030import java.io.OutputStream; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Set; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.elasticsearch.action.bulk.BulkItemResponse; 038import org.elasticsearch.action.bulk.BulkRequest; 039import org.elasticsearch.action.bulk.BulkResponse; 040import org.elasticsearch.action.delete.DeleteRequest; 041import org.elasticsearch.action.get.GetRequest; 042import org.elasticsearch.action.get.GetResponse; 043import org.elasticsearch.action.index.IndexRequest; 044import org.elasticsearch.action.search.SearchRequest; 045import org.elasticsearch.action.search.SearchResponse; 046import org.elasticsearch.action.search.SearchScrollRequest; 047import org.elasticsearch.common.io.stream.BytesStreamOutput; 048import org.elasticsearch.common.unit.TimeValue; 049import org.elasticsearch.index.VersionType; 050import org.elasticsearch.index.engine.VersionConflictEngineException; 051import org.elasticsearch.index.query.QueryBuilder; 052import org.elasticsearch.index.query.QueryBuilders; 053import org.elasticsearch.rest.RestStatus; 054import org.elasticsearch.search.SearchHit; 055import org.elasticsearch.search.builder.SearchSourceBuilder; 056import org.elasticsearch.search.fetch.subphase.FetchSourceContext; 057import org.nuxeo.common.logging.SequenceTracer; 058import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 059import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 060import org.nuxeo.ecm.core.api.DocumentModel; 061import org.nuxeo.ecm.core.api.DocumentNotFoundException; 062import org.nuxeo.ecm.core.api.NuxeoException; 063import org.nuxeo.ecm.core.api.model.BlobNotFoundException; 064import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 065import org.nuxeo.elasticsearch.commands.IndexingCommand; 066import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 067import org.nuxeo.runtime.api.Framework; 068import org.nuxeo.runtime.metrics.MetricsService; 069 070import com.codahale.metrics.MetricRegistry; 071import com.codahale.metrics.SharedMetricRegistries; 072import com.codahale.metrics.Timer; 073import com.codahale.metrics.Timer.Context; 074import com.fasterxml.jackson.core.JsonFactory; 075import com.fasterxml.jackson.core.JsonGenerator; 076 077/** 078 * @since 6.0 079 */ 080public class ElasticSearchIndexingImpl implements ElasticSearchIndexing { 081 private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class); 082 083 // debug curl line max size 084 private static final int MAX_CURL_LINE = 8 * 1024; 085 086 // send the bulk indexing command when this size is reached, optimal is 5-10m 087 private static final int DEFAULT_MAX_BULK_SIZE = 5 * 1024 * 1024; 088 089 private final ElasticSearchAdminImpl esa; 090 091 private final Timer deleteTimer; 092 093 private final Timer indexTimer; 094 095 private final Timer bulkIndexTimer; 096 097 private final boolean useExternalVersion; 098 099 private JsonESDocumentWriter jsonESDocumentWriter; 100 101 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) { 102 this.esa = esa; 103 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 104 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 105 deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete")); 106 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 107 this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer 108 this.useExternalVersion = esa.useExternalVersion(); 109 } 110 111 /** 112 * @since 7.2 113 */ 114 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) { 115 this(esa); 116 this.jsonESDocumentWriter = jsonESDocumentWriter; 117 } 118 119 @Override 120 public void runIndexingWorker(List<IndexingCommand> cmds) { 121 throw new UnsupportedOperationException("Not implemented"); 122 } 123 124 @Override 125 public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) { 126 throw new UnsupportedOperationException("Not implemented"); 127 } 128 129 @Override 130 public void reindexRepository(String repositoryName) { 131 throw new UnsupportedOperationException("Not implemented"); 132 } 133 134 @Override 135 public void indexNonRecursive(List<IndexingCommand> cmds) { 136 int nbCommands = cmds.size(); 137 if (nbCommands == 1) { 138 indexNonRecursive(cmds.get(0)); 139 return; 140 } 141 // simulate long indexing 142 // try {Thread.sleep(1000);} catch (InterruptedException e) { } 143 144 processBulkDeleteCommands(cmds); 145 try (Context ignored = bulkIndexTimer.time()) { 146 processBulkIndexCommands(cmds); 147 } 148 esa.totalCommandProcessed.addAndGet(nbCommands); 149 refreshIfNeeded(cmds); 150 } 151 152 void processBulkDeleteCommands(List<IndexingCommand> cmds) { 153 // Can be optimized with a single delete by query 154 for (IndexingCommand cmd : cmds) { 155 if (cmd.getType() == Type.DELETE) { 156 try (Context ignored = deleteTimer.time()) { 157 processDeleteCommand(cmd); 158 } 159 } 160 } 161 } 162 163 void processBulkIndexCommands(List<IndexingCommand> cmds) { 164 BulkRequest bulkRequest = new BulkRequest(); 165 Set<String> docIds = new HashSet<>(cmds.size()); 166 int bulkSize = 0; 167 final int maxBulkSize = getMaxBulkSize(); 168 for (IndexingCommand cmd : cmds) { 169 if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) { 170 continue; 171 } 172 if (!docIds.add(cmd.getTargetDocumentId())) { 173 // do not submit the same doc 2 times 174 continue; 175 } 176 try { 177 IndexRequest idxRequest = buildEsIndexingRequest(cmd); 178 if (idxRequest != null) { 179 bulkSize += idxRequest.source().length(); 180 bulkRequest.add(idxRequest); 181 } 182 } catch (BlobNotFoundException be) { 183 log.info("Ignore indexing command in bulk, blob does not exists anymore: " + cmd); 184 } catch (ConcurrentUpdateException e) { 185 throw e; // bubble up, usually until AbstractWork catches it and maybe retries 186 } catch (DocumentNotFoundException e) { 187 log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd); 188 } catch (IllegalArgumentException e) { 189 log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e); 190 } 191 if (bulkSize > maxBulkSize) { 192 log.warn("Max bulk size reached " + bulkSize + ", sending bulk command"); 193 sendBulkCommand(bulkRequest, bulkSize); 194 bulkRequest = new BulkRequest(); 195 bulkSize = 0; 196 } 197 } 198 sendBulkCommand(bulkRequest, bulkSize); 199 } 200 201 int getMaxBulkSize() { 202 String value = Framework.getProperty(INDEX_BULK_MAX_SIZE_PROPERTY, String.valueOf(DEFAULT_MAX_BULK_SIZE)); 203 return Integer.parseInt(value); 204 } 205 206 void sendBulkCommand(BulkRequest bulkRequest, int bulkSize) { 207 if (bulkRequest.numberOfActions() > 0) { 208 if (log.isDebugEnabled()) { 209 logDebugMessageTruncated(String.format( 210 "Index %d docs (%d bytes) in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'", 211 bulkRequest.numberOfActions(), bulkSize, bulkRequest.requests().toString()), MAX_CURL_LINE); 212 } 213 BulkResponse response = esa.getClient().bulk(bulkRequest); 214 if (response.hasFailures()) { 215 logBulkFailure(response); 216 } 217 } 218 } 219 220 void logBulkFailure(BulkResponse response) { 221 boolean isError = false; 222 StringBuilder sb = new StringBuilder(); 223 sb.append("Ignore indexing of some docs more recent versions has already been indexed"); 224 for (BulkItemResponse item : response.getItems()) { 225 if (item.isFailed()) { 226 if (item.getFailure().getStatus() == RestStatus.CONFLICT) { 227 sb.append("\n ").append(item.getFailureMessage()); 228 } else { 229 isError = true; 230 } 231 } 232 } 233 if (isError) { 234 log.error(response.buildFailureMessage()); 235 } else { 236 log.debug(sb); 237 } 238 } 239 240 void refreshIfNeeded(List<IndexingCommand> cmds) { 241 for (IndexingCommand cmd : cmds) { 242 if (refreshIfNeeded(cmd)) 243 return; 244 } 245 } 246 247 boolean refreshIfNeeded(IndexingCommand cmd) { 248 if (cmd.isSync()) { 249 esa.refresh(); 250 return true; 251 } 252 return false; 253 } 254 255 @Override 256 public void indexNonRecursive(IndexingCommand cmd) { 257 Type type = cmd.getType(); 258 if (type == Type.UPDATE_DIRECT_CHILDREN) { 259 // the parent don't need to be indexed 260 return; 261 } 262 if (type == Type.DELETE) { 263 try (Context ignored = deleteTimer.time()) { 264 processDeleteCommand(cmd); 265 } 266 } else { 267 try (Context ignored = indexTimer.time()) { 268 processIndexCommand(cmd); 269 } 270 } 271 refreshIfNeeded(cmd); 272 esa.totalCommandProcessed.incrementAndGet(); 273 } 274 275 void processIndexCommand(IndexingCommand cmd) { 276 IndexRequest request; 277 try { 278 request = buildEsIndexingRequest(cmd); 279 } catch (BlobNotFoundException pe) { 280 request = null; 281 } catch (DocumentNotFoundException e) { 282 request = null; 283 } catch (IllegalStateException e) { 284 log.error("Fail to create request for indexing command: " + cmd, e); 285 return; 286 } 287 if (request == null) { 288 log.info("Cancel indexing command because target document does not exists anymore: " + cmd); 289 return; 290 } 291 if (log.isDebugEnabled()) { 292 logDebugMessageTruncated(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'", 293 getWriteIndexForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(), 294 request.toString()), MAX_CURL_LINE); 295 } 296 try { 297 esa.getClient().index(request); 298 } catch (ConcurrentUpdateException e) { 299 SequenceTracer.addNote("Ignore indexing of doc " + cmd.getTargetDocumentId()); 300 log.info("Ignore indexing of doc " + cmd.getTargetDocumentId() 301 + " a more recent version has already been indexed: " + e.getMessage()); 302 } 303 } 304 305 void logDebugMessageTruncated(String msg, int maxSize) { 306 if (log.isTraceEnabled() || msg.length() < maxSize) { 307 // in trace mode we output the full message 308 log.debug(msg); 309 } else { 310 log.debug(msg.substring(0, maxSize) + "..."); 311 } 312 } 313 314 void processDeleteCommand(IndexingCommand cmd) { 315 if (cmd.isRecurse()) { 316 processDeleteCommandRecursive(cmd); 317 } else { 318 processDeleteCommandNonRecursive(cmd); 319 } 320 } 321 322 void processDeleteCommandNonRecursive(IndexingCommand cmd) { 323 String indexName = getWriteIndexForRepository(cmd.getRepositoryName()); 324 DeleteRequest request = new DeleteRequest(indexName, DOC_TYPE, cmd.getTargetDocumentId()); 325 if (log.isDebugEnabled()) { 326 log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName, 327 DOC_TYPE, cmd.getTargetDocumentId())); 328 } 329 esa.getClient().delete(request); 330 } 331 332 void processDeleteCommandRecursive(IndexingCommand cmd) { 333 String indexName = getWriteIndexForRepository(cmd.getRepositoryName()); 334 // we don't want to rely on target document because the document can be 335 // already removed 336 String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId()); 337 if (docPath == null) { 338 if (!Framework.isTestModeSet()) { 339 log.warn("Trying to delete a non existing doc: " + cmd.toString()); 340 } 341 return; 342 } 343 // Refresh index before bulk delete 344 esa.getClient().refresh(indexName); 345 346 // Run the scroll query 347 QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(CHILDREN_FIELD, docPath)); 348 TimeValue keepAlive = TimeValue.timeValueMinutes(1); 349 SearchSourceBuilder search = new SearchSourceBuilder().size(100).query(query).fetchSource(false); 350 SearchRequest request = new SearchRequest(indexName).scroll(keepAlive).source(search); 351 if (log.isDebugEnabled()) { 352 log.debug(String.format( 353 "Search with scroll request: curl -XGET 'http://localhost:9200/%s/%s/_search?scroll=%s' -d '%s'", 354 indexName, DOC_TYPE, keepAlive, query.toString())); 355 } 356 for (SearchResponse response = esa.getClient().search(request); // 357 response.getHits().getHits().length > 0; // 358 response = runNextScroll(response, keepAlive)) { 359 360 // Build bulk delete request 361 BulkRequest bulkRequest = new BulkRequest(); 362 for (SearchHit hit : response.getHits().getHits()) { 363 bulkRequest.add(new DeleteRequest(hit.getIndex(), hit.getType(), hit.getId())); 364 } 365 if (log.isDebugEnabled()) { 366 log.debug(String.format("Bulk delete request on %s elements", bulkRequest.numberOfActions())); 367 } 368 // Run bulk delete request 369 esa.getClient().bulk(bulkRequest); 370 } 371 } 372 373 SearchResponse runNextScroll(SearchResponse response, TimeValue keepAlive) { 374 if (log.isDebugEnabled()) { 375 log.debug(String.format( 376 "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'", 377 keepAlive, response.getScrollId())); 378 } 379 SearchScrollRequest request = new SearchScrollRequest(response.getScrollId()).scroll(keepAlive); 380 return esa.getClient().searchScroll(request); 381 } 382 383 /** 384 * Return the ecm:path of an ES document or null if not found. 385 */ 386 String getPathOfDocFromEs(String repository, String docId) { 387 String indexName = getWriteIndexForRepository(repository); 388 GetRequest request = new GetRequest(indexName, DOC_TYPE, docId).fetchSourceContext( 389 new FetchSourceContext(true, new String[] { PATH_FIELD }, null)); 390 if (log.isDebugEnabled()) { 391 log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", indexName, 392 DOC_TYPE, docId, PATH_FIELD)); 393 } 394 GetResponse ret = esa.getClient().get(request); 395 if (!ret.isExists() || ret.getSource() == null || ret.getSource().get(PATH_FIELD) == null) { 396 // doc not found 397 return null; 398 } 399 return ret.getSource().get(PATH_FIELD).toString(); 400 } 401 402 /** 403 * Return indexing request or null if the doc does not exists anymore. 404 * 405 * @throws java.lang.IllegalStateException if the command is not attached to a session 406 */ 407 IndexRequest buildEsIndexingRequest(IndexingCommand cmd) { 408 DocumentModel doc = cmd.getTargetDocument(); 409 if (doc == null) { 410 return null; 411 } 412 try { 413 JsonFactory factory = new JsonFactory(); 414 OutputStream out = new BytesStreamOutput(); 415 JsonGenerator jsonGen = factory.createGenerator(out); 416 jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null); 417 IndexRequest request = new IndexRequest(getWriteIndexForRepository(cmd.getRepositoryName()), DOC_TYPE, 418 cmd.getTargetDocumentId()).source(jsonBuilder(out)); 419 if (useExternalVersion && cmd.getOrder() > 0) { 420 request.versionType(VersionType.EXTERNAL).version(cmd.getOrder()); 421 } 422 return request; 423 } catch (IOException e) { 424 throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e); 425 } 426 } 427 428 protected String getWriteIndexForRepository(String repository) { 429 return esa.getWriteIndexName(esa.getIndexNameForRepository(repository)); 430 } 431 432}