001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD;
025import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
026import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD;
027
028import java.io.IOException;
029import java.io.OutputStream;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Set;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.codehaus.jackson.JsonFactory;
037import org.codehaus.jackson.JsonGenerator;
038import org.elasticsearch.action.bulk.BulkItemResponse;
039import org.elasticsearch.action.bulk.BulkRequestBuilder;
040import org.elasticsearch.action.bulk.BulkResponse;
041import org.elasticsearch.action.delete.DeleteRequestBuilder;
042import org.elasticsearch.action.get.GetRequestBuilder;
043import org.elasticsearch.action.get.GetResponse;
044import org.elasticsearch.action.index.IndexRequestBuilder;
045import org.elasticsearch.action.search.SearchRequestBuilder;
046import org.elasticsearch.action.search.SearchResponse;
047import org.elasticsearch.common.io.stream.BytesStreamOutput;
048import org.elasticsearch.common.unit.TimeValue;
049import org.elasticsearch.index.VersionType;
050import org.elasticsearch.index.engine.VersionConflictEngineException;
051import org.elasticsearch.index.query.QueryBuilder;
052import org.elasticsearch.index.query.QueryBuilders;
053import org.elasticsearch.rest.RestStatus;
054import org.elasticsearch.search.SearchHit;
055import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
056import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
057import org.nuxeo.ecm.core.api.DocumentModel;
058import org.nuxeo.ecm.core.api.DocumentNotFoundException;
059import org.nuxeo.ecm.core.api.NuxeoException;
060import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
061import org.nuxeo.elasticsearch.commands.IndexingCommand;
062import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
063import org.nuxeo.runtime.api.Framework;
064import org.nuxeo.runtime.metrics.MetricsService;
065
066import com.codahale.metrics.MetricRegistry;
067import com.codahale.metrics.SharedMetricRegistries;
068import com.codahale.metrics.Timer;
069import com.codahale.metrics.Timer.Context;
070
071/**
072 * @since 6.0
073 */
074public class ElasticSearchIndexingImpl implements ElasticSearchIndexing {
075    private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class);
076
077    private final ElasticSearchAdminImpl esa;
078
079    private final Timer deleteTimer;
080
081    private final Timer indexTimer;
082
083    private final Timer bulkIndexTimer;
084
085    private final boolean useExternalVersion;
086
087    private JsonESDocumentWriter jsonESDocumentWriter;
088
089    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) {
090        this.esa = esa;
091        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
092        indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
093        deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete"));
094        bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
095        this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer
096        this.useExternalVersion = esa.useExternalVersion();
097    }
098
099    /**
100     * @since 7.2
101     */
102    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) {
103        this(esa);
104        this.jsonESDocumentWriter = jsonESDocumentWriter;
105    }
106
107    @Override
108    public void runIndexingWorker(List<IndexingCommand> cmds) {
109        throw new UnsupportedOperationException("Not implemented");
110    }
111
112    @Override
113    public void runReindexingWorker(String repositoryName, String nxql) {
114        throw new UnsupportedOperationException("Not implemented");
115    }
116
117    @Override
118    public void indexNonRecursive(List<IndexingCommand> cmds) {
119        int nbCommands = cmds.size();
120        if (nbCommands == 1) {
121            indexNonRecursive(cmds.get(0));
122            return;
123        }
124        // simulate long indexing
125        // try {Thread.sleep(1000);} catch (InterruptedException e) { }
126
127        processBulkDeleteCommands(cmds);
128        Context stopWatch = bulkIndexTimer.time();
129        try {
130            processBulkIndexCommands(cmds);
131        } finally {
132            stopWatch.stop();
133        }
134        esa.totalCommandProcessed.addAndGet(nbCommands);
135        refreshIfNeeded(cmds);
136    }
137
138    void processBulkDeleteCommands(List<IndexingCommand> cmds) {
139        // Can be optimized with a single delete by query
140        for (IndexingCommand cmd : cmds) {
141            if (cmd.getType() == Type.DELETE) {
142                Context stopWatch = deleteTimer.time();
143                try {
144                    processDeleteCommand(cmd);
145                } finally {
146                    stopWatch.stop();
147                }
148            }
149        }
150    }
151
152    void processBulkIndexCommands(List<IndexingCommand> cmds) {
153        BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk();
154        Set<String> docIds = new HashSet<>(cmds.size());
155        for (IndexingCommand cmd : cmds) {
156            if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) {
157                continue;
158            }
159            if (!docIds.add(cmd.getTargetDocumentId())) {
160                // do not submit the same doc 2 times
161                continue;
162            }
163            try {
164                IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd);
165                if (idxRequest != null) {
166                    bulkRequest.add(idxRequest);
167                }
168            } catch (ConcurrentUpdateException e) {
169                throw e; // bubble up, usually until AbstractWork catches it and maybe retries
170            } catch (DocumentNotFoundException e) {
171                log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd);
172            } catch (IllegalArgumentException e) {
173                log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e);
174            }
175        }
176        if (bulkRequest.numberOfActions() > 0) {
177            if (log.isDebugEnabled()) {
178                log.debug(String.format(
179                        "Index %d docs in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'",
180                        bulkRequest.numberOfActions(), bulkRequest.request().requests().toString()));
181            }
182            BulkResponse response = bulkRequest.execute().actionGet();
183            if (response.hasFailures()) {
184                logBulkFailure(response);
185            }
186        }
187    }
188
189    protected void logBulkFailure(BulkResponse response) {
190        boolean isError = false;
191        StringBuilder sb = new StringBuilder();
192        sb.append("Ignore indexing of some docs more recent versions has already been indexed");
193        for (BulkItemResponse item : response.getItems()) {
194            if (item.isFailed()) {
195                if (item.getFailure().getStatus() == RestStatus.CONFLICT) {
196                    sb.append("\n  ").append(item.getFailureMessage());
197                } else {
198                    isError = true;
199                }
200            }
201        }
202        if (isError) {
203            log.error(response.buildFailureMessage());
204        } else {
205            log.info(sb);
206        }
207    }
208
209    protected void refreshIfNeeded(List<IndexingCommand> cmds) {
210        for (IndexingCommand cmd : cmds) {
211            if (refreshIfNeeded(cmd))
212                return;
213        }
214    }
215
216    private boolean refreshIfNeeded(IndexingCommand cmd) {
217        if (cmd.isSync()) {
218            esa.refresh();
219            return true;
220        }
221        return false;
222    }
223
224    @Override
225    public void indexNonRecursive(IndexingCommand cmd) {
226        Type type = cmd.getType();
227        if (type == Type.UPDATE_DIRECT_CHILDREN) {
228            // the parent don't need to be indexed
229            return;
230        }
231        Context stopWatch = null;
232        try {
233            if (type == Type.DELETE) {
234                stopWatch = deleteTimer.time();
235                processDeleteCommand(cmd);
236            } else {
237                stopWatch = indexTimer.time();
238                processIndexCommand(cmd);
239            }
240            refreshIfNeeded(cmd);
241        } finally {
242            if (stopWatch != null) {
243                stopWatch.stop();
244            }
245            esa.totalCommandProcessed.incrementAndGet();
246        }
247    }
248
249    void processIndexCommand(IndexingCommand cmd) {
250        IndexRequestBuilder request;
251        try {
252            request = buildEsIndexingRequest(cmd);
253        } catch (DocumentNotFoundException e) {
254            request = null;
255        } catch (IllegalStateException e) {
256            log.error("Fail to create request for indexing command: " + cmd, e);
257            return;
258        }
259        if (request == null) {
260            log.info("Cancel indexing command because target document does not exists anymore: " + cmd);
261            return;
262        }
263        if (log.isDebugEnabled()) {
264            log.debug(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'",
265                    esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(),
266                    request.request().toString()));
267        }
268        try {
269            request.execute().actionGet();
270        } catch (VersionConflictEngineException e) {
271            log.info("Ignore indexing of doc " + cmd.getTargetDocumentId()
272                    + " a more recent version has already been indexed: " + e.getMessage());
273        }
274    }
275
276    void processDeleteCommand(IndexingCommand cmd) {
277        if (cmd.isRecurse()) {
278            processDeleteCommandRecursive(cmd);
279        } else {
280            processDeleteCommandNonRecursive(cmd);
281        }
282    }
283
284    void processDeleteCommandNonRecursive(IndexingCommand cmd) {
285        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
286        DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId());
287        if (log.isDebugEnabled()) {
288            log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName,
289                    DOC_TYPE, cmd.getTargetDocumentId()));
290        }
291        request.execute().actionGet();
292    }
293
294    void processDeleteCommandRecursive(IndexingCommand cmd) {
295        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
296        // we don't want to rely on target document because the document can be
297        // already removed
298        String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId());
299        if (docPath == null) {
300            if (!Framework.isTestModeSet()) {
301                log.warn("Trying to delete a non existing doc: " + cmd.toString());
302            }
303            return;
304        }
305        // Refresh index before bulk delete
306        esa.getClient().admin().indices().prepareRefresh(indexName).get();
307
308        // Run the scroll query
309        QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(CHILDREN_FIELD, docPath));
310        TimeValue keepAlive = TimeValue.timeValueMinutes(1);
311        SearchRequestBuilder request = esa.getClient()
312                                          .prepareSearch(indexName)
313                                          .setTypes(DOC_TYPE)
314                                          .setScroll(keepAlive)
315                                          .setSize(100)
316                                          .setFetchSource(false)
317                                          .setQuery(query);
318        if (log.isDebugEnabled()) {
319            log.debug(String.format(
320                    "Search with scroll request: curl -XGET 'http://localhost:9200/%s/%s/_search?scroll=%s' -d '%s'",
321                    indexName, DOC_TYPE, keepAlive, query.toString()));
322        }
323        for (SearchResponse response = request.execute().actionGet(); //
324        response.getHits().getHits().length > 0; //
325        response = runNextScroll(response, keepAlive)) {
326
327            // Build bulk delete request
328            BulkRequestBuilder bulkBuilder = esa.getClient().prepareBulk();
329            for (SearchHit hit : response.getHits().getHits()) {
330                bulkBuilder.add(esa.getClient().prepareDelete(hit.getIndex(), hit.getType(), hit.getId()));
331            }
332            if (log.isDebugEnabled()) {
333                log.debug(String.format("Bulk delete request on %s elements", bulkBuilder.numberOfActions()));
334            }
335            // Run bulk delete request
336            bulkBuilder.execute().actionGet();
337        }
338    }
339
340    SearchResponse runNextScroll(SearchResponse response, TimeValue keepAlive) {
341        if (log.isDebugEnabled()) {
342            log.debug(String.format(
343                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
344                    keepAlive, response.getScrollId()));
345        }
346        return esa.getClient().prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
347    }
348
349    /**
350     * Return the ecm:path of an ES document or null if not found.
351     */
352    String getPathOfDocFromEs(String repository, String docId) {
353        String indexName = esa.getIndexNameForRepository(repository);
354        GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD);
355        if (log.isDebugEnabled()) {
356            log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", indexName,
357                    DOC_TYPE, docId, PATH_FIELD));
358        }
359        GetResponse ret = getRequest.execute().actionGet();
360        if (!ret.isExists() || ret.getField(PATH_FIELD) == null) {
361            // doc not found
362            return null;
363        }
364        return ret.getField(PATH_FIELD).getValue().toString();
365    }
366
367    /**
368     * Return indexing request or null if the doc does not exists anymore.
369     *
370     * @throws java.lang.IllegalStateException if the command is not attached to a session
371     */
372    IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) {
373        DocumentModel doc = cmd.getTargetDocument();
374        if (doc == null) {
375            return null;
376        }
377        try {
378            JsonFactory factory = new JsonFactory();
379            OutputStream out = new BytesStreamOutput();
380            JsonGenerator jsonGen = factory.createJsonGenerator(out);
381            jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null);
382            IndexRequestBuilder ret = esa.getClient()
383                                         .prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE,
384                                                 cmd.getTargetDocumentId())
385                                         .setSource(jsonBuilder(out));
386            if (useExternalVersion && cmd.getOrder() > 0) {
387                ret.setVersionType(VersionType.EXTERNAL).setVersion(cmd.getOrder());
388            }
389            return ret;
390        } catch (IOException e) {
391            throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e);
392        }
393    }
394
395}