001/* 002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD; 025import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 026import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD; 027 028import java.io.IOException; 029import java.io.OutputStream; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Set; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.codehaus.jackson.JsonFactory; 037import org.codehaus.jackson.JsonGenerator; 038import org.elasticsearch.action.bulk.BulkItemResponse; 039import org.elasticsearch.action.bulk.BulkRequestBuilder; 040import org.elasticsearch.action.bulk.BulkResponse; 041import org.elasticsearch.action.delete.DeleteRequestBuilder; 042import org.elasticsearch.action.get.GetRequestBuilder; 043import org.elasticsearch.action.get.GetResponse; 044import org.elasticsearch.action.index.IndexRequestBuilder; 045import org.elasticsearch.action.search.SearchRequestBuilder; 046import org.elasticsearch.action.search.SearchResponse; 047import org.elasticsearch.common.io.stream.BytesStreamOutput; 048import org.elasticsearch.common.unit.TimeValue; 049import org.elasticsearch.index.VersionType; 050import org.elasticsearch.index.engine.VersionConflictEngineException; 051import org.elasticsearch.index.query.QueryBuilder; 052import org.elasticsearch.index.query.QueryBuilders; 053import org.elasticsearch.rest.RestStatus; 054import org.elasticsearch.search.SearchHit; 055import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 056import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 057import org.nuxeo.ecm.core.api.DocumentModel; 058import org.nuxeo.ecm.core.api.DocumentNotFoundException; 059import org.nuxeo.ecm.core.api.NuxeoException; 060import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 061import org.nuxeo.elasticsearch.commands.IndexingCommand; 062import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 063import org.nuxeo.runtime.api.Framework; 064import org.nuxeo.runtime.metrics.MetricsService; 065 066import com.codahale.metrics.MetricRegistry; 067import com.codahale.metrics.SharedMetricRegistries; 068import com.codahale.metrics.Timer; 069import com.codahale.metrics.Timer.Context; 070 071/** 072 * @since 6.0 073 */ 074public class ElasticSearchIndexingImpl implements ElasticSearchIndexing { 075 private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class); 076 077 private final ElasticSearchAdminImpl esa; 078 079 private final Timer deleteTimer; 080 081 private final Timer indexTimer; 082 083 private final Timer bulkIndexTimer; 084 085 private final boolean useExternalVersion; 086 087 private JsonESDocumentWriter jsonESDocumentWriter; 088 089 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) { 090 this.esa = esa; 091 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 092 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 093 deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete")); 094 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 095 this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer 096 this.useExternalVersion = esa.useExternalVersion(); 097 } 098 099 /** 100 * @since 7.2 101 */ 102 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) { 103 this(esa); 104 this.jsonESDocumentWriter = jsonESDocumentWriter; 105 } 106 107 @Override 108 public void runIndexingWorker(List<IndexingCommand> cmds) { 109 throw new UnsupportedOperationException("Not implemented"); 110 } 111 112 @Override 113 public void runReindexingWorker(String repositoryName, String nxql) { 114 throw new UnsupportedOperationException("Not implemented"); 115 } 116 117 @Override 118 public void indexNonRecursive(List<IndexingCommand> cmds) { 119 int nbCommands = cmds.size(); 120 if (nbCommands == 1) { 121 indexNonRecursive(cmds.get(0)); 122 return; 123 } 124 // simulate long indexing 125 // try {Thread.sleep(1000);} catch (InterruptedException e) { } 126 127 processBulkDeleteCommands(cmds); 128 Context stopWatch = bulkIndexTimer.time(); 129 try { 130 processBulkIndexCommands(cmds); 131 } finally { 132 stopWatch.stop(); 133 } 134 esa.totalCommandProcessed.addAndGet(nbCommands); 135 refreshIfNeeded(cmds); 136 } 137 138 void processBulkDeleteCommands(List<IndexingCommand> cmds) { 139 // Can be optimized with a single delete by query 140 for (IndexingCommand cmd : cmds) { 141 if (cmd.getType() == Type.DELETE) { 142 Context stopWatch = deleteTimer.time(); 143 try { 144 processDeleteCommand(cmd); 145 } finally { 146 stopWatch.stop(); 147 } 148 } 149 } 150 } 151 152 void processBulkIndexCommands(List<IndexingCommand> cmds) { 153 BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk(); 154 Set<String> docIds = new HashSet<>(cmds.size()); 155 for (IndexingCommand cmd : cmds) { 156 if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) { 157 continue; 158 } 159 if (!docIds.add(cmd.getTargetDocumentId())) { 160 // do not submit the same doc 2 times 161 continue; 162 } 163 try { 164 IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd); 165 if (idxRequest != null) { 166 bulkRequest.add(idxRequest); 167 } 168 } catch (ConcurrentUpdateException e) { 169 throw e; // bubble up, usually until AbstractWork catches it and maybe retries 170 } catch (DocumentNotFoundException e) { 171 log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd); 172 } catch (IllegalArgumentException e) { 173 log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e); 174 } 175 } 176 if (bulkRequest.numberOfActions() > 0) { 177 if (log.isDebugEnabled()) { 178 log.debug(String.format( 179 "Index %d docs in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'", 180 bulkRequest.numberOfActions(), bulkRequest.request().requests().toString())); 181 } 182 BulkResponse response = bulkRequest.execute().actionGet(); 183 if (response.hasFailures()) { 184 logBulkFailure(response); 185 } 186 } 187 } 188 189 protected void logBulkFailure(BulkResponse response) { 190 boolean isError = false; 191 StringBuilder sb = new StringBuilder(); 192 sb.append("Ignore indexing of some docs more recent versions has already been indexed"); 193 for (BulkItemResponse item : response.getItems()) { 194 if (item.isFailed()) { 195 if (item.getFailure().getStatus() == RestStatus.CONFLICT) { 196 sb.append("\n ").append(item.getFailureMessage()); 197 } else { 198 isError = true; 199 } 200 } 201 } 202 if (isError) { 203 log.error(response.buildFailureMessage()); 204 } else { 205 log.info(sb); 206 } 207 } 208 209 protected void refreshIfNeeded(List<IndexingCommand> cmds) { 210 for (IndexingCommand cmd : cmds) { 211 if (refreshIfNeeded(cmd)) 212 return; 213 } 214 } 215 216 private boolean refreshIfNeeded(IndexingCommand cmd) { 217 if (cmd.isSync()) { 218 esa.refresh(); 219 return true; 220 } 221 return false; 222 } 223 224 @Override 225 public void indexNonRecursive(IndexingCommand cmd) { 226 Type type = cmd.getType(); 227 if (type == Type.UPDATE_DIRECT_CHILDREN) { 228 // the parent don't need to be indexed 229 return; 230 } 231 Context stopWatch = null; 232 try { 233 if (type == Type.DELETE) { 234 stopWatch = deleteTimer.time(); 235 processDeleteCommand(cmd); 236 } else { 237 stopWatch = indexTimer.time(); 238 processIndexCommand(cmd); 239 } 240 refreshIfNeeded(cmd); 241 } finally { 242 if (stopWatch != null) { 243 stopWatch.stop(); 244 } 245 esa.totalCommandProcessed.incrementAndGet(); 246 } 247 } 248 249 void processIndexCommand(IndexingCommand cmd) { 250 IndexRequestBuilder request; 251 try { 252 request = buildEsIndexingRequest(cmd); 253 } catch (DocumentNotFoundException e) { 254 request = null; 255 } catch (IllegalStateException e) { 256 log.error("Fail to create request for indexing command: " + cmd, e); 257 return; 258 } 259 if (request == null) { 260 log.info("Cancel indexing command because target document does not exists anymore: " + cmd); 261 return; 262 } 263 if (log.isDebugEnabled()) { 264 log.debug(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'", 265 esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(), 266 request.request().toString())); 267 } 268 try { 269 request.execute().actionGet(); 270 } catch (VersionConflictEngineException e) { 271 log.info("Ignore indexing of doc " + cmd.getTargetDocumentId() 272 + " a more recent version has already been indexed: " + e.getMessage()); 273 } 274 } 275 276 void processDeleteCommand(IndexingCommand cmd) { 277 if (cmd.isRecurse()) { 278 processDeleteCommandRecursive(cmd); 279 } else { 280 processDeleteCommandNonRecursive(cmd); 281 } 282 } 283 284 void processDeleteCommandNonRecursive(IndexingCommand cmd) { 285 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 286 DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId()); 287 if (log.isDebugEnabled()) { 288 log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName, 289 DOC_TYPE, cmd.getTargetDocumentId())); 290 } 291 request.execute().actionGet(); 292 } 293 294 void processDeleteCommandRecursive(IndexingCommand cmd) { 295 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 296 // we don't want to rely on target document because the document can be 297 // already removed 298 String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId()); 299 if (docPath == null) { 300 if (!Framework.isTestModeSet()) { 301 log.warn("Trying to delete a non existing doc: " + cmd.toString()); 302 } 303 return; 304 } 305 // Refresh index before bulk delete 306 esa.getClient().admin().indices().prepareRefresh(indexName).get(); 307 308 // Run the scroll query 309 QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(CHILDREN_FIELD, docPath)); 310 TimeValue keepAlive = TimeValue.timeValueMinutes(1); 311 SearchRequestBuilder request = esa.getClient() 312 .prepareSearch(indexName) 313 .setTypes(DOC_TYPE) 314 .setScroll(keepAlive) 315 .setSize(100) 316 .setFetchSource(false) 317 .setQuery(query); 318 if (log.isDebugEnabled()) { 319 log.debug(String.format( 320 "Search with scroll request: curl -XGET 'http://localhost:9200/%s/%s/_search?scroll=%s' -d '%s'", 321 indexName, DOC_TYPE, keepAlive, query.toString())); 322 } 323 for (SearchResponse response = request.execute().actionGet(); // 324 response.getHits().getHits().length > 0; // 325 response = runNextScroll(response, keepAlive)) { 326 327 // Build bulk delete request 328 BulkRequestBuilder bulkBuilder = esa.getClient().prepareBulk(); 329 for (SearchHit hit : response.getHits().getHits()) { 330 bulkBuilder.add(esa.getClient().prepareDelete(hit.getIndex(), hit.getType(), hit.getId())); 331 } 332 if (log.isDebugEnabled()) { 333 log.debug(String.format("Bulk delete request on %s elements", bulkBuilder.numberOfActions())); 334 } 335 // Run bulk delete request 336 bulkBuilder.execute().actionGet(); 337 } 338 } 339 340 SearchResponse runNextScroll(SearchResponse response, TimeValue keepAlive) { 341 if (log.isDebugEnabled()) { 342 log.debug(String.format( 343 "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'", 344 keepAlive, response.getScrollId())); 345 } 346 return esa.getClient().prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet(); 347 } 348 349 /** 350 * Return the ecm:path of an ES document or null if not found. 351 */ 352 String getPathOfDocFromEs(String repository, String docId) { 353 String indexName = esa.getIndexNameForRepository(repository); 354 GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD); 355 if (log.isDebugEnabled()) { 356 log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", indexName, 357 DOC_TYPE, docId, PATH_FIELD)); 358 } 359 GetResponse ret = getRequest.execute().actionGet(); 360 if (!ret.isExists() || ret.getField(PATH_FIELD) == null) { 361 // doc not found 362 return null; 363 } 364 return ret.getField(PATH_FIELD).getValue().toString(); 365 } 366 367 /** 368 * Return indexing request or null if the doc does not exists anymore. 369 * 370 * @throws java.lang.IllegalStateException if the command is not attached to a session 371 */ 372 IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) { 373 DocumentModel doc = cmd.getTargetDocument(); 374 if (doc == null) { 375 return null; 376 } 377 try { 378 JsonFactory factory = new JsonFactory(); 379 OutputStream out = new BytesStreamOutput(); 380 JsonGenerator jsonGen = factory.createJsonGenerator(out); 381 jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null); 382 IndexRequestBuilder ret = esa.getClient() 383 .prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, 384 cmd.getTargetDocumentId()) 385 .setSource(jsonBuilder(out)); 386 if (useExternalVersion && cmd.getOrder() > 0) { 387 ret.setVersionType(VersionType.EXTERNAL).setVersion(cmd.getOrder()); 388 } 389 return ret; 390 } catch (IOException e) { 391 throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e); 392 } 393 } 394 395}