001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import com.codahale.metrics.MetricRegistry; 024import com.codahale.metrics.SharedMetricRegistries; 025import com.codahale.metrics.Timer; 026import com.codahale.metrics.Timer.Context; 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.codehaus.jackson.JsonFactory; 030import org.codehaus.jackson.JsonGenerator; 031import org.elasticsearch.action.bulk.BulkRequestBuilder; 032import org.elasticsearch.action.bulk.BulkResponse; 033import org.elasticsearch.action.delete.DeleteRequestBuilder; 034import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; 035import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; 036import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; 037import org.elasticsearch.action.get.GetRequestBuilder; 038import org.elasticsearch.action.get.GetResponse; 039import org.elasticsearch.action.index.IndexRequestBuilder; 040import org.elasticsearch.common.xcontent.XContentBuilder; 041import org.elasticsearch.index.query.FilterBuilders; 042import org.elasticsearch.index.query.QueryBuilder; 043import org.elasticsearch.index.query.QueryBuilders; 044import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 045import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 046import org.nuxeo.ecm.core.api.DocumentModel; 047import org.nuxeo.ecm.core.api.DocumentNotFoundException; 048import org.nuxeo.ecm.core.api.NuxeoException; 049import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 050import org.nuxeo.elasticsearch.commands.IndexingCommand; 051import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 052import org.nuxeo.runtime.api.Framework; 053import org.nuxeo.runtime.metrics.MetricsService; 054 055import java.io.IOException; 056import java.util.List; 057 058import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 059import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD; 060import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 061import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD; 062 063/** 064 * @since 6.0 065 */ 066public class ElasticSearchIndexingImpl implements ElasticSearchIndexing { 067 private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class); 068 069 private final ElasticSearchAdminImpl esa; 070 071 private final Timer deleteTimer; 072 073 private final Timer indexTimer; 074 075 private final Timer bulkIndexTimer; 076 077 private JsonESDocumentWriter jsonESDocumentWriter; 078 079 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) { 080 this.esa = esa; 081 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 082 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 083 deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete")); 084 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 085 this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer 086 } 087 088 /** 089 * @since 7.2 090 */ 091 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) { 092 this(esa); 093 this.jsonESDocumentWriter = jsonESDocumentWriter; 094 } 095 096 @Override 097 public void runIndexingWorker(List<IndexingCommand> cmds) { 098 throw new UnsupportedOperationException("Not implemented"); 099 } 100 101 @Override 102 public void runReindexingWorker(String repositoryName, String nxql) { 103 throw new UnsupportedOperationException("Not implemented"); 104 } 105 106 @Override 107 public void indexNonRecursive(List<IndexingCommand> cmds) { 108 int nbCommands = cmds.size(); 109 if (nbCommands == 1) { 110 indexNonRecursive(cmds.get(0)); 111 return; 112 } 113 // simulate long indexing 114 // try {Thread.sleep(1000);} catch (InterruptedException e) { } 115 116 processBulkDeleteCommands(cmds); 117 Context stopWatch = bulkIndexTimer.time(); 118 try { 119 processBulkIndexCommands(cmds); 120 } finally { 121 stopWatch.stop(); 122 } 123 esa.totalCommandProcessed.addAndGet(nbCommands); 124 refreshIfNeeded(cmds); 125 } 126 127 void processBulkDeleteCommands(List<IndexingCommand> cmds) { 128 // Can be optimized with a single delete by query 129 for (IndexingCommand cmd : cmds) { 130 if (cmd.getType() == Type.DELETE) { 131 Context stopWatch = deleteTimer.time(); 132 try { 133 processDeleteCommand(cmd); 134 } finally { 135 stopWatch.stop(); 136 } 137 } 138 } 139 } 140 141 void processBulkIndexCommands(List<IndexingCommand> cmds) { 142 BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk(); 143 for (IndexingCommand cmd : cmds) { 144 if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) { 145 continue; 146 } 147 try { 148 IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd); 149 if (idxRequest != null) { 150 bulkRequest.add(idxRequest); 151 } 152 } catch (ConcurrentUpdateException e) { 153 throw e; // bubble up, usually until AbstractWork catches it and maybe retries 154 } catch (DocumentNotFoundException e) { 155 log.info("Skip indexing command to bulk, doc does not exists anymore: " + cmd); 156 } catch (IllegalArgumentException e) { 157 log.error("Skip indexing command to bulk, fail to create request: " + cmd, e); 158 } 159 } 160 if (bulkRequest.numberOfActions() > 0) { 161 if (log.isDebugEnabled()) { 162 log.debug(String.format( 163 "Index %d docs in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'", 164 bulkRequest.numberOfActions(), bulkRequest.request().requests().toString())); 165 } 166 BulkResponse response = bulkRequest.execute().actionGet(); 167 if (response.hasFailures()) { 168 log.error(response.buildFailureMessage()); 169 } 170 } 171 } 172 173 protected void refreshIfNeeded(List<IndexingCommand> cmds) { 174 for (IndexingCommand cmd : cmds) { 175 if (refreshIfNeeded(cmd)) 176 return; 177 } 178 } 179 180 private boolean refreshIfNeeded(IndexingCommand cmd) { 181 if (cmd.isSync()) { 182 esa.refresh(); 183 return true; 184 } 185 return false; 186 } 187 188 @Override 189 public void indexNonRecursive(IndexingCommand cmd) { 190 Type type = cmd.getType(); 191 if (type == Type.UPDATE_DIRECT_CHILDREN) { 192 // the parent don't need to be indexed 193 return; 194 } 195 Context stopWatch = null; 196 try { 197 if (type == Type.DELETE) { 198 stopWatch = deleteTimer.time(); 199 processDeleteCommand(cmd); 200 } else { 201 stopWatch = indexTimer.time(); 202 processIndexCommand(cmd); 203 } 204 refreshIfNeeded(cmd); 205 } finally { 206 if (stopWatch != null) { 207 stopWatch.stop(); 208 } 209 esa.totalCommandProcessed.incrementAndGet(); 210 } 211 } 212 213 void processIndexCommand(IndexingCommand cmd) { 214 IndexRequestBuilder request; 215 try { 216 request = buildEsIndexingRequest(cmd); 217 } catch (DocumentNotFoundException e) { 218 request = null; 219 } catch (IllegalStateException e) { 220 log.error("Fail to create request for indexing command: " + cmd, e); 221 return; 222 } 223 if (request == null) { 224 log.info("Cancel indexing command because target document does not exists anymore: " + cmd); 225 return; 226 } 227 if (log.isDebugEnabled()) { 228 log.debug(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'", 229 esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(), 230 request.request().toString())); 231 } 232 request.execute().actionGet(); 233 } 234 235 void processDeleteCommand(IndexingCommand cmd) { 236 if (cmd.isRecurse()) { 237 processDeleteCommandRecursive(cmd); 238 } else { 239 processDeleteCommandNonRecursive(cmd); 240 } 241 } 242 243 void processDeleteCommandNonRecursive(IndexingCommand cmd) { 244 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 245 DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId()); 246 if (log.isDebugEnabled()) { 247 log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName, 248 DOC_TYPE, cmd.getTargetDocumentId())); 249 } 250 request.execute().actionGet(); 251 } 252 253 void processDeleteCommandRecursive(IndexingCommand cmd) { 254 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 255 // we don't want to rely on target document because the document can be 256 // already removed 257 String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId()); 258 if (docPath == null) { 259 if (!Framework.isTestModeSet()) { 260 log.warn("Trying to delete a non existing doc: " + cmd.toString()); 261 } 262 return; 263 } 264 QueryBuilder query = QueryBuilders.constantScoreQuery(FilterBuilders.termFilter(CHILDREN_FIELD, docPath)); 265 DeleteByQueryRequestBuilder deleteRequest = esa.getClient().prepareDeleteByQuery(indexName).setTypes(DOC_TYPE).setQuery( 266 query); 267 if (log.isDebugEnabled()) { 268 log.debug(String.format( 269 "Delete byQuery request: curl -XDELETE 'http://localhost:9200/%s/%s/_query' -d '%s'", indexName, 270 DOC_TYPE, query.toString())); 271 } 272 DeleteByQueryResponse responses = deleteRequest.execute().actionGet(); 273 for (IndexDeleteByQueryResponse response : responses) { 274 // there is no way to trace how many docs are removed 275 if (response.getFailedShards() > 0) { 276 log.error(String.format("Delete byQuery fails on shard: %d out of %d", response.getFailedShards(), 277 response.getTotalShards())); 278 } 279 } 280 } 281 282 /** 283 * Return the ecm:path of an ES document or null if not found. 284 */ 285 String getPathOfDocFromEs(String repository, String docId) { 286 String indexName = esa.getIndexNameForRepository(repository); 287 GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD); 288 if (log.isDebugEnabled()) { 289 log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", 290 indexName, DOC_TYPE, docId, PATH_FIELD)); 291 } 292 GetResponse ret = getRequest.execute().actionGet(); 293 if (!ret.isExists() || ret.getField(PATH_FIELD) == null) { 294 // doc not found 295 return null; 296 } 297 return ret.getField(PATH_FIELD).getValue().toString(); 298 } 299 300 /** 301 * Return indexing request or null if the doc does not exists anymore. 302 * 303 * @throws java.lang.IllegalStateException if the command is not attached to a session 304 */ 305 IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) { 306 DocumentModel doc = cmd.getTargetDocument(); 307 if (doc == null) { 308 return null; 309 } 310 try { 311 JsonFactory factory = new JsonFactory(); 312 XContentBuilder builder = jsonBuilder(); 313 JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream()); 314 jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null); 315 return esa.getClient().prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, 316 cmd.getTargetDocumentId()).setSource(builder); 317 } catch (IOException e) { 318 throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e); 319 } 320 } 321 322}