001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Tiry 016 * bdelbosc 017 */ 018 019package org.nuxeo.elasticsearch.core; 020 021import com.codahale.metrics.MetricRegistry; 022import com.codahale.metrics.SharedMetricRegistries; 023import com.codahale.metrics.Timer; 024import com.codahale.metrics.Timer.Context; 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.codehaus.jackson.JsonFactory; 028import org.codehaus.jackson.JsonGenerator; 029import org.elasticsearch.action.bulk.BulkRequestBuilder; 030import org.elasticsearch.action.bulk.BulkResponse; 031import org.elasticsearch.action.delete.DeleteRequestBuilder; 032import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; 033import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; 034import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; 035import org.elasticsearch.action.get.GetRequestBuilder; 036import org.elasticsearch.action.get.GetResponse; 037import org.elasticsearch.action.index.IndexRequestBuilder; 038import org.elasticsearch.common.xcontent.XContentBuilder; 039import org.elasticsearch.index.query.FilterBuilders; 040import org.elasticsearch.index.query.QueryBuilder; 041import org.elasticsearch.index.query.QueryBuilders; 042import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 043import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 044import org.nuxeo.ecm.core.api.DocumentModel; 045import org.nuxeo.ecm.core.api.DocumentNotFoundException; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 048import org.nuxeo.elasticsearch.commands.IndexingCommand; 049import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 050import org.nuxeo.runtime.api.Framework; 051import org.nuxeo.runtime.metrics.MetricsService; 052 053import java.io.IOException; 054import java.util.List; 055 056import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 057import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD; 058import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 059import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD; 060 061/** 062 * @since 6.0 063 */ 064public class ElasticSearchIndexingImpl implements ElasticSearchIndexing { 065 private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class); 066 067 private final ElasticSearchAdminImpl esa; 068 069 private final Timer deleteTimer; 070 071 private final Timer indexTimer; 072 073 private final Timer bulkIndexTimer; 074 075 private JsonESDocumentWriter jsonESDocumentWriter; 076 077 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) { 078 this.esa = esa; 079 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 080 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 081 deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete")); 082 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 083 this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer 084 } 085 086 /** 087 * @since 7.2 088 */ 089 public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) { 090 this(esa); 091 this.jsonESDocumentWriter = jsonESDocumentWriter; 092 } 093 094 @Override 095 public void runIndexingWorker(List<IndexingCommand> cmds) { 096 throw new UnsupportedOperationException("Not implemented"); 097 } 098 099 @Override 100 public void runReindexingWorker(String repositoryName, String nxql) { 101 throw new UnsupportedOperationException("Not implemented"); 102 } 103 104 @Override 105 public void indexNonRecursive(List<IndexingCommand> cmds) { 106 int nbCommands = cmds.size(); 107 if (nbCommands == 1) { 108 indexNonRecursive(cmds.get(0)); 109 return; 110 } 111 // simulate long indexing 112 // try {Thread.sleep(1000);} catch (InterruptedException e) { } 113 114 processBulkDeleteCommands(cmds); 115 Context stopWatch = bulkIndexTimer.time(); 116 try { 117 processBulkIndexCommands(cmds); 118 } finally { 119 stopWatch.stop(); 120 } 121 esa.totalCommandProcessed.addAndGet(nbCommands); 122 refreshIfNeeded(cmds); 123 } 124 125 void processBulkDeleteCommands(List<IndexingCommand> cmds) { 126 // Can be optimized with a single delete by query 127 for (IndexingCommand cmd : cmds) { 128 if (cmd.getType() == Type.DELETE) { 129 Context stopWatch = deleteTimer.time(); 130 try { 131 processDeleteCommand(cmd); 132 } finally { 133 stopWatch.stop(); 134 } 135 } 136 } 137 } 138 139 void processBulkIndexCommands(List<IndexingCommand> cmds) { 140 BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk(); 141 for (IndexingCommand cmd : cmds) { 142 if (cmd.getType() == Type.DELETE) { 143 continue; 144 } 145 try { 146 IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd); 147 if (idxRequest != null) { 148 bulkRequest.add(idxRequest); 149 } 150 } catch (ConcurrentUpdateException e) { 151 throw e; // bubble up, usually until AbstractWork catches it and maybe retries 152 } catch (DocumentNotFoundException e) { 153 log.info("Skip indexing command to bulk, doc does not exists anymore: " + cmd); 154 } catch (IllegalArgumentException e) { 155 log.error("Skip indexing command to bulk, fail to create request: " + cmd, e); 156 } 157 } 158 if (bulkRequest.numberOfActions() > 0) { 159 if (log.isDebugEnabled()) { 160 log.debug(String.format( 161 "Index %d docs in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'", 162 bulkRequest.numberOfActions(), bulkRequest.request().requests().toString())); 163 } 164 BulkResponse response = bulkRequest.execute().actionGet(); 165 if (response.hasFailures()) { 166 log.error(response.buildFailureMessage()); 167 } 168 } 169 } 170 171 protected void refreshIfNeeded(List<IndexingCommand> cmds) { 172 for (IndexingCommand cmd : cmds) { 173 if (refreshIfNeeded(cmd)) 174 return; 175 } 176 } 177 178 private boolean refreshIfNeeded(IndexingCommand cmd) { 179 if (cmd.isSync()) { 180 esa.refresh(); 181 return true; 182 } 183 return false; 184 } 185 186 @Override 187 public void indexNonRecursive(IndexingCommand cmd) { 188 Context stopWatch = null; 189 try { 190 if (cmd.getType() == Type.DELETE) { 191 stopWatch = deleteTimer.time(); 192 processDeleteCommand(cmd); 193 } else { 194 stopWatch = indexTimer.time(); 195 processIndexCommand(cmd); 196 } 197 refreshIfNeeded(cmd); 198 } finally { 199 if (stopWatch != null) { 200 stopWatch.stop(); 201 } 202 esa.totalCommandProcessed.incrementAndGet(); 203 } 204 } 205 206 void processIndexCommand(IndexingCommand cmd) { 207 IndexRequestBuilder request; 208 try { 209 request = buildEsIndexingRequest(cmd); 210 } catch (DocumentNotFoundException e) { 211 request = null; 212 } catch (IllegalStateException e) { 213 log.error("Fail to create request for indexing command: " + cmd, e); 214 return; 215 } 216 if (request == null) { 217 log.info("Cancel indexing command because target document does not exists anymore: " + cmd); 218 return; 219 } 220 if (log.isDebugEnabled()) { 221 log.debug(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'", 222 esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(), 223 request.request().toString())); 224 } 225 request.execute().actionGet(); 226 } 227 228 void processDeleteCommand(IndexingCommand cmd) { 229 if (cmd.isRecurse()) { 230 processDeleteCommandRecursive(cmd); 231 } else { 232 processDeleteCommandNonRecursive(cmd); 233 } 234 } 235 236 void processDeleteCommandNonRecursive(IndexingCommand cmd) { 237 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 238 DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId()); 239 if (log.isDebugEnabled()) { 240 log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName, 241 DOC_TYPE, cmd.getTargetDocumentId())); 242 } 243 request.execute().actionGet(); 244 } 245 246 void processDeleteCommandRecursive(IndexingCommand cmd) { 247 String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName()); 248 // we don't want to rely on target document because the document can be 249 // already removed 250 String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId()); 251 if (docPath == null) { 252 if (!Framework.isTestModeSet()) { 253 log.warn("Trying to delete a non existing doc: " + cmd.toString()); 254 } 255 return; 256 } 257 QueryBuilder query = QueryBuilders.constantScoreQuery(FilterBuilders.termFilter(CHILDREN_FIELD, docPath)); 258 DeleteByQueryRequestBuilder deleteRequest = esa.getClient().prepareDeleteByQuery(indexName).setTypes(DOC_TYPE).setQuery( 259 query); 260 if (log.isDebugEnabled()) { 261 log.debug(String.format( 262 "Delete byQuery request: curl -XDELETE 'http://localhost:9200/%s/%s/_query' -d '%s'", indexName, 263 DOC_TYPE, query.toString())); 264 } 265 DeleteByQueryResponse responses = deleteRequest.execute().actionGet(); 266 for (IndexDeleteByQueryResponse response : responses) { 267 // there is no way to trace how many docs are removed 268 if (response.getFailedShards() > 0) { 269 log.error(String.format("Delete byQuery fails on shard: %d out of %d", response.getFailedShards(), 270 response.getTotalShards())); 271 } 272 } 273 } 274 275 /** 276 * Return the ecm:path of an ES document or null if not found. 277 */ 278 String getPathOfDocFromEs(String repository, String docId) { 279 String indexName = esa.getIndexNameForRepository(repository); 280 GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD); 281 if (log.isDebugEnabled()) { 282 log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", 283 indexName, DOC_TYPE, docId, PATH_FIELD)); 284 } 285 GetResponse ret = getRequest.execute().actionGet(); 286 if (!ret.isExists() || ret.getField(PATH_FIELD) == null) { 287 // doc not found 288 return null; 289 } 290 return ret.getField(PATH_FIELD).getValue().toString(); 291 } 292 293 /** 294 * Return indexing request or null if the doc does not exists anymore. 295 * 296 * @throws java.lang.IllegalStateException if the command is not attached to a session 297 */ 298 IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) { 299 DocumentModel doc = cmd.getTargetDocument(); 300 if (doc == null) { 301 return null; 302 } 303 try { 304 JsonFactory factory = new JsonFactory(); 305 XContentBuilder builder = jsonBuilder(); 306 JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream()); 307 jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null); 308 return esa.getClient().prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, 309 cmd.getTargetDocumentId()).setSource(builder); 310 } catch (IOException e) { 311 throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e); 312 } 313 } 314 315}