001/* 002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD; 025import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 026import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEX_BULK_MAX_SIZE_PROPERTY; 027import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD; 028 029import java.io.IOException; 030import java.io.OutputStream; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Set; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.codehaus.jackson.JsonFactory; 038import org.codehaus.jackson.JsonGenerator; 039import org.elasticsearch.action.bulk.BulkItemResponse; 040import org.elasticsearch.action.bulk.BulkRequest; 041import org.elasticsearch.action.bulk.BulkResponse; 042import org.elasticsearch.action.delete.DeleteRequest; 043import org.elasticsearch.action.get.GetRequest; 044import org.elasticsearch.action.get.GetResponse; 045import org.elasticsearch.action.index.IndexRequest; 046import org.elasticsearch.action.search.SearchRequest; 047import org.elasticsearch.action.search.SearchResponse; 048import org.elasticsearch.action.search.SearchScrollRequest; 049import org.elasticsearch.common.io.stream.BytesStreamOutput; 050import org.elasticsearch.common.unit.TimeValue; 051import org.elasticsearch.index.VersionType; 052import org.elasticsearch.index.engine.VersionConflictEngineException; 053import org.elasticsearch.index.query.QueryBuilder; 054import org.elasticsearch.index.query.QueryBuilders; 055import org.elasticsearch.rest.RestStatus; 056import org.elasticsearch.search.SearchHit; 057import org.elasticsearch.search.builder.SearchSourceBuilder; 058import org.elasticsearch.search.fetch.subphase.FetchSourceContext; 059import org.nuxeo.common.logging.SequenceTracer; 060import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 061import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 062import org.nuxeo.ecm.core.api.DocumentModel; 063import org.nuxeo.ecm.core.api.DocumentNotFoundException; 064import org.nuxeo.ecm.core.api.NuxeoException; 065import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 066import org.nuxeo.elasticsearch.commands.IndexingCommand; 067import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 068import org.nuxeo.runtime.api.Framework; 069import org.nuxeo.runtime.metrics.MetricsService; 070 071import com.codahale.metrics.MetricRegistry; 072import com.codahale.metrics.SharedMetricRegistries; 073import com.codahale.metrics.Timer; 074import com.codahale.metrics.Timer.Context; 075 076/** 077 * @since 6.0 078 */ 079public class ElasticSearchIndexingImpl implements ElasticSearchIndexing { 080 private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class); 081 082 // debug curl line max size 083 private static final int MAX_CURL_LINE = 8 * 1024; 084 085 // send the bulk indexing command when this size is reached, optimal is 5-10m 086 private static final int DEFAULT_MAX_BULK_SIZE = 5 * 1024 * 1024; 087 088 private final ElasticSearchAdminImpl esa; 089 090 private final Timer deleteTimer; 091 092 private final Timer indexTimer; 093 094 private final Timer bulkIndexTimer; 095 096 private final boolean useExternalVersion; 097 098 private JsonESDocumentWriter jsonESDocumentWriter; 099 100 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) { 101 this.esa = esa; 102 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 103 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 104 deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete")); 105 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 106 this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer 107 this.useExternalVersion = esa.useExternalVersion(); 108 } 109 110 /** 111 * @since 7.2 112 */ 113 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) { 114 this(esa); 115 this.jsonESDocumentWriter = jsonESDocumentWriter; 116 } 117 118 @Override 119 public void runIndexingWorker(List<IndexingCommand> cmds) { 120 throw new UnsupportedOperationException("Not implemented"); 121 } 122 123 @Override 124 public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) { 125 throw new UnsupportedOperationException("Not implemented"); 126 } 127 128 @Override 129 public void reindexRepository(String repositoryName) { 130 throw new UnsupportedOperationException("Not implemented"); 131 } 132 133 @Override 134 public void indexNonRecursive(List<IndexingCommand> cmds) { 135 int nbCommands = cmds.size(); 136 if (nbCommands == 1) { 137 indexNonRecursive(cmds.get(0)); 138 return; 139 } 140 // simulate long indexing 141 // try {Thread.sleep(1000);} catch (InterruptedException e) { } 142 143 processBulkDeleteCommands(cmds); 144 try (Context ignored = bulkIndexTimer.time()) { 145 processBulkIndexCommands(cmds); 146 } 147 esa.totalCommandProcessed.addAndGet(nbCommands); 148 refreshIfNeeded(cmds); 149 } 150 151 void processBulkDeleteCommands(List<IndexingCommand> cmds) { 152 // Can be optimized with a single delete by query 153 for (IndexingCommand cmd : cmds) { 154 if (cmd.getType() == Type.DELETE) { 155 try (Context ignored = deleteTimer.time()) { 156 processDeleteCommand(cmd); 157 } 158 } 159 } 160 } 161 162 void processBulkIndexCommands(List<IndexingCommand> cmds) { 163 BulkRequest bulkRequest = new BulkRequest(); 164 Set<String> docIds = new HashSet<>(cmds.size()); 165 int bulkSize = 0; 166 final int maxBulkSize = getMaxBulkSize(); 167 for (IndexingCommand cmd : cmds) { 168 if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) { 169 continue; 170 } 171 if (!docIds.add(cmd.getTargetDocumentId())) { 172 // do not submit the same doc 2 times 173 continue; 174 } 175 try { 176 IndexRequest idxRequest = buildEsIndexingRequest(cmd); 177 if (idxRequest != null) { 178 bulkSize += idxRequest.source().length(); 179 bulkRequest.add(idxRequest); 180 } 181 } catch (ConcurrentUpdateException e) { 182 throw e; // bubble up, usually until AbstractWork catches it and maybe retries 183 } catch (DocumentNotFoundException e) { 184 log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd); 185 } catch (IllegalArgumentException e) { 186 log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e); 187 } 188 if (bulkSize > maxBulkSize) { 189 log.warn("Max bulk size reached " + bulkSize + ", sending bulk command"); 190 sendBulkCommand(bulkRequest, bulkSize); 191 bulkRequest = new BulkRequest(); 192 bulkSize = 0; 193 } 194 } 195 sendBulkCommand(bulkRequest, bulkSize); 196 } 197 198 int getMaxBulkSize() { 199 String value = Framework.getProperty(INDEX_BULK_MAX_SIZE_PROPERTY, String.valueOf(DEFAULT_MAX_BULK_SIZE)); 200 return Integer.parseInt(value); 201 } 202 203 void sendBulkCommand(BulkRequest bulkRequest, int bulkSize) { 204 if (bulkRequest.numberOfActions() > 0) { 205 if (log.isDebugEnabled()) { 206 logDebugMessageTruncated(String.format( 207 "Index %d docs (%d bytes) in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'", 208 bulkRequest.numberOfActions(), bulkSize, bulkRequest.requests().toString()), MAX_CURL_LINE); 209 } 210 BulkResponse response = esa.getClient().bulk(bulkRequest); 211 if (response.hasFailures()) { 212 logBulkFailure(response); 213 } 214 } 215 } 216 217 void logBulkFailure(BulkResponse response) { 218 boolean isError = false; 219 StringBuilder sb = new StringBuilder(); 220 sb.append("Ignore indexing of some docs more recent versions has already been indexed"); 221 for (BulkItemResponse item : response.getItems()) { 222 if (item.isFailed()) { 223 if (item.getFailure().getStatus() == RestStatus.CONFLICT) { 224 sb.append("\n ").append(item.getFailureMessage()); 225 } else { 226 isError = true; 227 } 228 } 229 } 230 if (isError) { 231 log.error(response.buildFailureMessage()); 232 } else { 233 log.debug(sb); 234 } 235 } 236 237 void refreshIfNeeded(List<IndexingCommand> cmds) { 238 for (IndexingCommand cmd : cmds) { 239 if (refreshIfNeeded(cmd)) 240 return; 241 } 242 } 243 244 boolean refreshIfNeeded(IndexingCommand cmd) { 245 if (cmd.isSync()) { 246 esa.refresh(); 247 return true; 248 } 249 return false; 250 } 251 252 @Override 253 public void indexNonRecursive(IndexingCommand cmd) { 254 Type type = cmd.getType(); 255 if (type == Type.UPDATE_DIRECT_CHILDREN) { 256 // the parent don't need to be indexed 257 return; 258 } 259 if (type == Type.DELETE) { 260 try (Context ignored = deleteTimer.time()) { 261 processDeleteCommand(cmd); 262 } 263 } else { 264 try (Context ignored = indexTimer.time()) { 265 processIndexCommand(cmd); 266 } 267 } 268 refreshIfNeeded(cmd); 269 esa.totalCommandProcessed.incrementAndGet(); 270 } 271 272 void processIndexCommand(IndexingCommand cmd) { 273 IndexRequest request; 274 try { 275 request = buildEsIndexingRequest(cmd); 276 } catch (DocumentNotFoundException e) { 277 request = null; 278 } catch (IllegalStateException e) { 279 log.error("Fail to create request for indexing command: " + cmd, e); 280 return; 281 } 282 if (request == null) { 283 log.info("Cancel indexing command because target document does not exists anymore: " + cmd); 284 return; 285 } 286 if (log.isDebugEnabled()) { 287 logDebugMessageTruncated(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'", 288 getWriteIndexForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(), 289 request.toString()), MAX_CURL_LINE); 290 } 291 try { 292 esa.getClient().index(request); 293 } catch (VersionConflictEngineException e) { 294 SequenceTracer.addNote("Ignore indexing of doc " + cmd.getTargetDocumentId()); 295 log.info("Ignore indexing of doc " + cmd.getTargetDocumentId() 296 + " a more recent version has already been indexed: " + e.getMessage()); 297 } 298 } 299 300 void logDebugMessageTruncated(String msg, int maxSize) { 301 if (log.isTraceEnabled() || msg.length() < maxSize) { 302 // in trace mode we output the full message 303 log.debug(msg); 304 } else { 305 log.debug(msg.substring(0, maxSize) + "..."); 306 } 307 } 308 309 void processDeleteCommand(IndexingCommand cmd) { 310 if (cmd.isRecurse()) { 311 processDeleteCommandRecursive(cmd); 312 } else { 313 processDeleteCommandNonRecursive(cmd); 314 } 315 } 316 317 void processDeleteCommandNonRecursive(IndexingCommand cmd) { 318 String indexName = getWriteIndexForRepository(cmd.getRepositoryName()); 319 DeleteRequest request = new DeleteRequest(indexName, DOC_TYPE, cmd.getTargetDocumentId()); 320 if (log.isDebugEnabled()) { 321 log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName, 322 DOC_TYPE, cmd.getTargetDocumentId())); 323 } 324 esa.getClient().delete(request); 325 } 326 327 void processDeleteCommandRecursive(IndexingCommand cmd) { 328 String indexName = getWriteIndexForRepository(cmd.getRepositoryName()); 329 // we don't want to rely on target document because the document can be 330 // already removed 331 String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId()); 332 if (docPath == null) { 333 if (!Framework.isTestModeSet()) { 334 log.warn("Trying to delete a non existing doc: " + cmd.toString()); 335 } 336 return; 337 } 338 // Refresh index before bulk delete 339 esa.getClient().refresh(indexName); 340 341 // Run the scroll query 342 QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(CHILDREN_FIELD, docPath)); 343 TimeValue keepAlive = TimeValue.timeValueMinutes(1); 344 SearchSourceBuilder search = new SearchSourceBuilder().size(100).query(query).fetchSource(false); 345 SearchRequest request = new SearchRequest(indexName).scroll(keepAlive).source(search); 346 if (log.isDebugEnabled()) { 347 log.debug(String.format( 348 "Search with scroll request: curl -XGET 'http://localhost:9200/%s/%s/_search?scroll=%s' -d '%s'", 349 indexName, DOC_TYPE, keepAlive, query.toString())); 350 } 351 for (SearchResponse response = esa.getClient().search(request); // 352 response.getHits().getHits().length > 0; // 353 response = runNextScroll(response, keepAlive)) { 354 355 // Build bulk delete request 356 BulkRequest bulkRequest = new BulkRequest(); 357 for (SearchHit hit : response.getHits().getHits()) { 358 bulkRequest.add(new DeleteRequest(hit.getIndex(), hit.getType(), hit.getId())); 359 } 360 if (log.isDebugEnabled()) { 361 log.debug(String.format("Bulk delete request on %s elements", bulkRequest.numberOfActions())); 362 } 363 // Run bulk delete request 364 esa.getClient().bulk(bulkRequest); 365 } 366 } 367 368 SearchResponse runNextScroll(SearchResponse response, TimeValue keepAlive) { 369 if (log.isDebugEnabled()) { 370 log.debug(String.format( 371 "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'", 372 keepAlive, response.getScrollId())); 373 } 374 SearchScrollRequest request = new SearchScrollRequest(response.getScrollId()).scroll(keepAlive); 375 return esa.getClient().searchScroll(request); 376 } 377 378 /** 379 * Return the ecm:path of an ES document or null if not found. 380 */ 381 String getPathOfDocFromEs(String repository, String docId) { 382 String indexName = getWriteIndexForRepository(repository); 383 GetRequest request = new GetRequest(indexName, DOC_TYPE, docId).fetchSourceContext( 384 new FetchSourceContext(true, new String[] { PATH_FIELD }, null)); 385 if (log.isDebugEnabled()) { 386 log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", indexName, 387 DOC_TYPE, docId, PATH_FIELD)); 388 } 389 GetResponse ret = esa.getClient().get(request); 390 if (!ret.isExists() || ret.getSource() == null || ret.getSource().get(PATH_FIELD) == null) { 391 // doc not found 392 return null; 393 } 394 return ret.getSource().get(PATH_FIELD).toString(); 395 } 396 397 /** 398 * Return indexing request or null if the doc does not exists anymore. 399 * 400 * @throws java.lang.IllegalStateException if the command is not attached to a session 401 */ 402 IndexRequest buildEsIndexingRequest(IndexingCommand cmd) { 403 DocumentModel doc = cmd.getTargetDocument(); 404 if (doc == null) { 405 return null; 406 } 407 try { 408 JsonFactory factory = new JsonFactory(); 409 OutputStream out = new BytesStreamOutput(); 410 JsonGenerator jsonGen = factory.createJsonGenerator(out); 411 jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null); 412 IndexRequest request = new IndexRequest(getWriteIndexForRepository(cmd.getRepositoryName()), DOC_TYPE, 413 cmd.getTargetDocumentId()).source(jsonBuilder(out)); 414 if (useExternalVersion && cmd.getOrder() > 0) { 415 request.versionType(VersionType.EXTERNAL).version(cmd.getOrder()); 416 } 417 return request; 418 } catch (IOException e) { 419 throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e); 420 } 421 } 422 423 protected String getWriteIndexForRepository(String repository) { 424 return esa.getWriteIndexName(esa.getIndexNameForRepository(repository)); 425 } 426 427}