001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import com.codahale.metrics.MetricRegistry; 024import com.codahale.metrics.SharedMetricRegistries; 025import com.codahale.metrics.Timer; 026import com.codahale.metrics.Timer.Context; 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.codehaus.jackson.JsonFactory; 030import org.codehaus.jackson.JsonGenerator; 031import org.elasticsearch.action.bulk.BulkItemResponse; 032import org.elasticsearch.action.bulk.BulkRequestBuilder; 033import org.elasticsearch.action.bulk.BulkResponse; 034import org.elasticsearch.action.delete.DeleteRequestBuilder; 035import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; 036import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; 037import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; 038import org.elasticsearch.action.get.GetRequestBuilder; 039import org.elasticsearch.action.get.GetResponse; 040import org.elasticsearch.action.index.IndexRequestBuilder; 041import org.elasticsearch.common.xcontent.XContentBuilder; 042import org.elasticsearch.index.VersionType; 043import org.elasticsearch.index.engine.VersionConflictEngineException; 044import org.elasticsearch.index.query.FilterBuilders; 045import org.elasticsearch.index.query.QueryBuilder; 046import org.elasticsearch.index.query.QueryBuilders; 047import org.elasticsearch.rest.RestStatus; 048import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 049import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 050import org.nuxeo.ecm.core.api.DocumentModel; 051import org.nuxeo.ecm.core.api.DocumentNotFoundException; 052import org.nuxeo.ecm.core.api.NuxeoException; 053import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 054import org.nuxeo.elasticsearch.commands.IndexingCommand; 055import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 056import org.nuxeo.runtime.api.Framework; 057import org.nuxeo.runtime.metrics.MetricsService; 058 059import java.io.IOException; 060import java.util.HashSet; 061import java.util.List; 062import java.util.Set; 063 064import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 065import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD; 066import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 067import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD; 068 069/** 070 * @since 6.0 071 */ 072public class ElasticSearchIndexingImpl implements ElasticSearchIndexing { 073 private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class); 074 075 private final ElasticSearchAdminImpl esa; 076 077 private final Timer deleteTimer; 078 079 private final Timer indexTimer; 080 081 private final Timer bulkIndexTimer; 082 083 private final boolean useExternalVersion; 084 085 private JsonESDocumentWriter jsonESDocumentWriter; 086 087 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) { 088 this.esa = esa; 089 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 090 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 091 deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete")); 092 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 093 this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer 094 this.useExternalVersion = esa.useExternalVersion(); 095 } 096 097 /** 098 * @since 7.2 099 */ 100 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) { 101 this(esa); 102 this.jsonESDocumentWriter = jsonESDocumentWriter; 103 } 104 105 @Override 106 public void runIndexingWorker(List<IndexingCommand> cmds) { 107 throw new UnsupportedOperationException("Not implemented"); 108 } 109 110 @Override 111 public void runReindexingWorker(String repositoryName, String nxql) { 112 throw new UnsupportedOperationException("Not implemented"); 113 } 114 115 @Override 116 public void indexNonRecursive(List<IndexingCommand> cmds) { 117 int nbCommands = cmds.size(); 118 if (nbCommands == 1) { 119 indexNonRecursive(cmds.get(0)); 120 return; 121 } 122 // simulate long indexing 123 // try {Thread.sleep(1000);} catch (InterruptedException e) { } 124 125 processBulkDeleteCommands(cmds); 126 Context stopWatch = bulkIndexTimer.time(); 127 try { 128 processBulkIndexCommands(cmds); 129 } finally { 130 stopWatch.stop(); 131 } 132 esa.totalCommandProcessed.addAndGet(nbCommands); 133 refreshIfNeeded(cmds); 134 } 135 136 void processBulkDeleteCommands(List<IndexingCommand> cmds) { 137 // Can be optimized with a single delete by query 138 for (IndexingCommand cmd : cmds) { 139 if (cmd.getType() == Type.DELETE) { 140 Context stopWatch = deleteTimer.time(); 141 try { 142 processDeleteCommand(cmd); 143 } finally { 144 stopWatch.stop(); 145 } 146 } 147 } 148 } 149 150 void processBulkIndexCommands(List<IndexingCommand> cmds) { 151 BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk(); 152 Set<String> docIds = new HashSet<>(cmds.size()); 153 for (IndexingCommand cmd : cmds) { 154 if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) { 155 continue; 156 } 157 if (! docIds.add(cmd.getTargetDocumentId()) ) { 158 // do not submit the same doc 2 times 159 continue; 160 } 161 try { 162 IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd); 163 if (idxRequest != null) { 164 bulkRequest.add(idxRequest); 165 } 166 } catch (ConcurrentUpdateException e) { 167 throw e; // bubble up, usually until AbstractWork catches it and maybe retries 168 } catch (DocumentNotFoundException e) { 169 log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd); 170 } catch (IllegalArgumentException e) { 171 log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e); 172 } 173 } 174 if (bulkRequest.numberOfActions() > 0) { 175 if (log.isDebugEnabled()) { 176 log.debug(String.format( 177 "Index %d docs in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'", 178 bulkRequest.numberOfActions(), bulkRequest.request().requests().toString())); 179 } 180 BulkResponse response = bulkRequest.execute().actionGet(); 181 if (response.hasFailures()) { 182 logBulkFailure(response); 183 } 184 } 185 } 186 187 protected void logBulkFailure(BulkResponse response) { 188 boolean isError = false; 189 StringBuilder sb = new StringBuilder(); 190 sb.append("Ignore indexing of some docs more recent versions has already been indexed"); 191 for (BulkItemResponse item : response.getItems()) { 192 if (item.isFailed()) { 193 if (item.getFailure().getStatus() == RestStatus.CONFLICT) { 194 sb.append("\n ").append(item.getFailureMessage()); 195 } else { 196 isError = true; 197 } 198 } 199 } 200 if (isError) { 201 log.error(response.buildFailureMessage()); 202 } else { 203 log.info(sb); 204 } 205 } 206 207 protected void refreshIfNeeded(List<IndexingCommand> cmds) { 208 for (IndexingCommand cmd : cmds) { 209 if (refreshIfNeeded(cmd)) 210 return; 211 } 212 } 213 214 private boolean refreshIfNeeded(IndexingCommand cmd) { 215 if (cmd.isSync()) { 216 esa.refresh(); 217 return true; 218 } 219 return false; 220 } 221 222 @Override 223 public void indexNonRecursive(IndexingCommand cmd) { 224 Type type = cmd.getType(); 225 if (type == Type.UPDATE_DIRECT_CHILDREN) { 226 // the parent don't need to be indexed 227 return; 228 } 229 Context stopWatch = null; 230 try { 231 if (type == Type.DELETE) { 232 stopWatch = deleteTimer.time(); 233 processDeleteCommand(cmd); 234 } else { 235 stopWatch = indexTimer.time(); 236 processIndexCommand(cmd); 237 } 238 refreshIfNeeded(cmd); 239 } finally { 240 if (stopWatch != null) { 241 stopWatch.stop(); 242 } 243 esa.totalCommandProcessed.incrementAndGet(); 244 } 245 } 246 247 void processIndexCommand(IndexingCommand cmd) { 248 IndexRequestBuilder request; 249 try { 250 request = buildEsIndexingRequest(cmd); 251 } catch (DocumentNotFoundException e) { 252 request = null; 253 } catch (IllegalStateException e) { 254 log.error("Fail to create request for indexing command: " + cmd, e); 255 return; 256 } 257 if (request == null) { 258 log.info("Cancel indexing command because target document does not exists anymore: " + cmd); 259 return; 260 } 261 if (log.isDebugEnabled()) { 262 log.debug(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'", 263 esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(), 264 request.request().toString())); 265 } 266 try { 267 request.execute().actionGet(); 268 } catch (VersionConflictEngineException e) { 269 log.info("Ignore indexing of doc " + cmd.getTargetDocumentId() + 270 " a more recent version has already been indexed: " + e.getMessage()); 271 } 272 } 273 274 void processDeleteCommand(IndexingCommand cmd) { 275 if (cmd.isRecurse()) { 276 processDeleteCommandRecursive(cmd); 277 } else { 278 processDeleteCommandNonRecursive(cmd); 279 } 280 } 281 282 void processDeleteCommandNonRecursive(IndexingCommand cmd) { 283 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 284 DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId()); 285 if (log.isDebugEnabled()) { 286 log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName, 287 DOC_TYPE, cmd.getTargetDocumentId())); 288 } 289 request.execute().actionGet(); 290 } 291 292 void processDeleteCommandRecursive(IndexingCommand cmd) { 293 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 294 // we don't want to rely on target document because the document can be 295 // already removed 296 String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId()); 297 if (docPath == null) { 298 if (!Framework.isTestModeSet()) { 299 log.warn("Trying to delete a non existing doc: " + cmd.toString()); 300 } 301 return; 302 } 303 QueryBuilder query = QueryBuilders.constantScoreQuery(FilterBuilders.termFilter(CHILDREN_FIELD, docPath)); 304 DeleteByQueryRequestBuilder deleteRequest = esa.getClient().prepareDeleteByQuery(indexName).setTypes(DOC_TYPE).setQuery( 305 query); 306 if (log.isDebugEnabled()) { 307 log.debug(String.format( 308 "Delete byQuery request: curl -XDELETE 'http://localhost:9200/%s/%s/_query' -d '%s'", indexName, 309 DOC_TYPE, query.toString())); 310 } 311 DeleteByQueryResponse responses = deleteRequest.execute().actionGet(); 312 for (IndexDeleteByQueryResponse response : responses) { 313 // there is no way to trace how many docs are removed 314 if (response.getFailedShards() > 0) { 315 log.error(String.format("Delete byQuery fails on shard: %d out of %d", response.getFailedShards(), 316 response.getTotalShards())); 317 } 318 } 319 } 320 321 /** 322 * Return the ecm:path of an ES document or null if not found. 323 */ 324 String getPathOfDocFromEs(String repository, String docId) { 325 String indexName = esa.getIndexNameForRepository(repository); 326 GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD); 327 if (log.isDebugEnabled()) { 328 log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", 329 indexName, DOC_TYPE, docId, PATH_FIELD)); 330 } 331 GetResponse ret = getRequest.execute().actionGet(); 332 if (!ret.isExists() || ret.getField(PATH_FIELD) == null) { 333 // doc not found 334 return null; 335 } 336 return ret.getField(PATH_FIELD).getValue().toString(); 337 } 338 339 /** 340 * Return indexing request or null if the doc does not exists anymore. 341 * 342 * @throws java.lang.IllegalStateException if the command is not attached to a session 343 */ 344 IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) { 345 DocumentModel doc = cmd.getTargetDocument(); 346 if (doc == null) { 347 return null; 348 } 349 try { 350 JsonFactory factory = new JsonFactory(); 351 XContentBuilder builder = jsonBuilder(); 352 JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream()); 353 jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null); 354 IndexRequestBuilder ret = esa.getClient().prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, 355 cmd.getTargetDocumentId()).setSource(builder); 356 if (useExternalVersion && cmd.getOrder() > 0) { 357 ret.setVersionType(VersionType.EXTERNAL).setVersion(cmd.getOrder()); 358 } 359 return ret; 360 } catch (IOException e) { 361 throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e); 362 } 363 } 364 365}