001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD;
025import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
026import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEX_BULK_MAX_SIZE_PROPERTY;
027import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD;
028
029import java.io.IOException;
030import java.io.OutputStream;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Set;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.codehaus.jackson.JsonFactory;
038import org.codehaus.jackson.JsonGenerator;
039import org.elasticsearch.action.bulk.BulkItemResponse;
040import org.elasticsearch.action.bulk.BulkRequestBuilder;
041import org.elasticsearch.action.bulk.BulkResponse;
042import org.elasticsearch.action.delete.DeleteRequestBuilder;
043import org.elasticsearch.action.get.GetRequestBuilder;
044import org.elasticsearch.action.get.GetResponse;
045import org.elasticsearch.action.index.IndexRequestBuilder;
046import org.elasticsearch.action.search.SearchRequestBuilder;
047import org.elasticsearch.action.search.SearchResponse;
048import org.elasticsearch.common.io.stream.BytesStreamOutput;
049import org.elasticsearch.common.unit.TimeValue;
050import org.elasticsearch.index.VersionType;
051import org.elasticsearch.index.engine.VersionConflictEngineException;
052import org.elasticsearch.index.query.QueryBuilder;
053import org.elasticsearch.index.query.QueryBuilders;
054import org.elasticsearch.rest.RestStatus;
055import org.elasticsearch.search.SearchHit;
056import org.nuxeo.common.logging.SequenceTracer;
057import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
058import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
059import org.nuxeo.ecm.core.api.DocumentModel;
060import org.nuxeo.ecm.core.api.DocumentNotFoundException;
061import org.nuxeo.ecm.core.api.NuxeoException;
062import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
063import org.nuxeo.elasticsearch.commands.IndexingCommand;
064import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
065import org.nuxeo.runtime.api.Framework;
066import org.nuxeo.runtime.metrics.MetricsService;
067
068import com.codahale.metrics.MetricRegistry;
069import com.codahale.metrics.SharedMetricRegistries;
070import com.codahale.metrics.Timer;
071import com.codahale.metrics.Timer.Context;
072
073/**
074 * @since 6.0
075 */
076public class ElasticSearchIndexingImpl implements ElasticSearchIndexing {
077    private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class);
078
079    // debug curl line max size
080    private static final int MAX_CURL_LINE = 8 * 1024;
081
082    // send the bulk indexing command when this size is reached, optimal is 5-10m
083    private static final int DEFAULT_MAX_BULK_SIZE = 5 * 1024 * 1024;
084
085    private final ElasticSearchAdminImpl esa;
086
087    private final Timer deleteTimer;
088
089    private final Timer indexTimer;
090
091    private final Timer bulkIndexTimer;
092
093    private final boolean useExternalVersion;
094
095    private JsonESDocumentWriter jsonESDocumentWriter;
096
097    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) {
098        this.esa = esa;
099        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
100        indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
101        deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete"));
102        bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
103        this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer
104        this.useExternalVersion = esa.useExternalVersion();
105    }
106
107    /**
108     * @since 7.2
109     */
110    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) {
111        this(esa);
112        this.jsonESDocumentWriter = jsonESDocumentWriter;
113    }
114
115    @Override
116    public void runIndexingWorker(List<IndexingCommand> cmds) {
117        throw new UnsupportedOperationException("Not implemented");
118    }
119
120    @Override
121    public void runReindexingWorker(String repositoryName, String nxql) {
122        throw new UnsupportedOperationException("Not implemented");
123    }
124
125    @Override
126    public void indexNonRecursive(List<IndexingCommand> cmds) {
127        int nbCommands = cmds.size();
128        if (nbCommands == 1) {
129            indexNonRecursive(cmds.get(0));
130            return;
131        }
132        // simulate long indexing
133        // try {Thread.sleep(1000);} catch (InterruptedException e) { }
134
135        processBulkDeleteCommands(cmds);
136        try (Context ignored = bulkIndexTimer.time()) {
137            processBulkIndexCommands(cmds);
138        }
139        esa.totalCommandProcessed.addAndGet(nbCommands);
140        refreshIfNeeded(cmds);
141    }
142
143    void processBulkDeleteCommands(List<IndexingCommand> cmds) {
144        // Can be optimized with a single delete by query
145        for (IndexingCommand cmd : cmds) {
146            if (cmd.getType() == Type.DELETE) {
147                try (Context ignored = deleteTimer.time()){
148                    processDeleteCommand(cmd);
149                }
150            }
151        }
152    }
153
154    void processBulkIndexCommands(List<IndexingCommand> cmds) {
155        BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk();
156        Set<String> docIds = new HashSet<>(cmds.size());
157        int bulkSize = 0;
158        final int maxBulkSize = getMaxBulkSize();
159        for (IndexingCommand cmd : cmds) {
160            if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) {
161                continue;
162            }
163            if (!docIds.add(cmd.getTargetDocumentId())) {
164                // do not submit the same doc 2 times
165                continue;
166            }
167            try {
168                IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd);
169                if (idxRequest != null) {
170                    bulkSize += idxRequest.request().source().length();
171                    bulkRequest.add(idxRequest);
172                }
173            } catch (ConcurrentUpdateException e) {
174                throw e; // bubble up, usually until AbstractWork catches it and maybe retries
175            } catch (DocumentNotFoundException e) {
176                log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd);
177            } catch (IllegalArgumentException e) {
178                log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e);
179            }
180            if (bulkSize > maxBulkSize) {
181                log.warn("Max bulk size reached " + bulkSize + ", sending bulk command");
182                sendBulkCommand(bulkRequest, bulkSize);
183                bulkRequest = esa.getClient().prepareBulk();
184                bulkSize = 0;
185            }
186        }
187        sendBulkCommand(bulkRequest, bulkSize);
188    }
189
190    int getMaxBulkSize() {
191        String value = Framework.getProperty(INDEX_BULK_MAX_SIZE_PROPERTY, String.valueOf(DEFAULT_MAX_BULK_SIZE));
192        return Integer.parseInt(value);
193    }
194
195    void sendBulkCommand(BulkRequestBuilder bulkRequest, int bulkSize) {
196        if (bulkRequest.numberOfActions() > 0) {
197            if (log.isDebugEnabled()) {
198                logDebugMessageTruncated(String.format(
199                        "Index %d docs (%d bytes) in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'",
200                        bulkRequest.numberOfActions(), bulkSize,
201                        bulkRequest.request().requests().toString()), MAX_CURL_LINE);
202            }
203            BulkResponse response = bulkRequest.execute().actionGet();
204            if (response.hasFailures()) {
205                logBulkFailure(response);
206            }
207        }
208    }
209
210    void logBulkFailure(BulkResponse response) {
211        boolean isError = false;
212        StringBuilder sb = new StringBuilder();
213        sb.append("Ignore indexing of some docs more recent versions has already been indexed");
214        for (BulkItemResponse item : response.getItems()) {
215            if (item.isFailed()) {
216                if (item.getFailure().getStatus() == RestStatus.CONFLICT) {
217                    sb.append("\n  ").append(item.getFailureMessage());
218                } else {
219                    isError = true;
220                }
221            }
222        }
223        if (isError) {
224            log.error(response.buildFailureMessage());
225        } else {
226            log.info(sb);
227        }
228    }
229
230    void refreshIfNeeded(List<IndexingCommand> cmds) {
231        for (IndexingCommand cmd : cmds) {
232            if (refreshIfNeeded(cmd))
233                return;
234        }
235    }
236
237    boolean refreshIfNeeded(IndexingCommand cmd) {
238        if (cmd.isSync()) {
239            esa.refresh();
240            return true;
241        }
242        return false;
243    }
244
245    @Override
246    public void indexNonRecursive(IndexingCommand cmd) {
247        Type type = cmd.getType();
248        if (type == Type.UPDATE_DIRECT_CHILDREN) {
249            // the parent don't need to be indexed
250            return;
251        }
252        if (type == Type.DELETE) {
253            try (Context ignored = deleteTimer.time()) {
254                processDeleteCommand(cmd);
255            }
256        } else {
257            try (Context ignored = indexTimer.time()) {
258                processIndexCommand(cmd);
259            }
260        }
261        refreshIfNeeded(cmd);
262        esa.totalCommandProcessed.incrementAndGet();
263    }
264
265    void processIndexCommand(IndexingCommand cmd) {
266        IndexRequestBuilder request;
267        try {
268            request = buildEsIndexingRequest(cmd);
269        } catch (DocumentNotFoundException e) {
270            request = null;
271        } catch (IllegalStateException e) {
272            log.error("Fail to create request for indexing command: " + cmd, e);
273            return;
274        }
275        if (request == null) {
276            log.info("Cancel indexing command because target document does not exists anymore: " + cmd);
277            return;
278        }
279        if (log.isDebugEnabled()) {
280            logDebugMessageTruncated(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'",
281                    esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(),
282                    request.request().toString()), MAX_CURL_LINE);
283        }
284        try {
285            request.execute().actionGet();
286        } catch (VersionConflictEngineException e) {
287            SequenceTracer.addNote("Ignore indexing of doc " + cmd.getTargetDocumentId());
288            log.info("Ignore indexing of doc " + cmd.getTargetDocumentId()
289                    + " a more recent version has already been indexed: " + e.getMessage());
290        }
291    }
292
293    void logDebugMessageTruncated(String msg, int maxSize) {
294        if (log.isTraceEnabled() || msg.length() < maxSize) {
295            // in trace mode we output the full message
296            log.debug(msg);
297        } else {
298            log.debug(msg.substring(0, maxSize) + "...");
299        }
300    }
301
302    void processDeleteCommand(IndexingCommand cmd) {
303        if (cmd.isRecurse()) {
304            processDeleteCommandRecursive(cmd);
305        } else {
306            processDeleteCommandNonRecursive(cmd);
307        }
308    }
309
310    void processDeleteCommandNonRecursive(IndexingCommand cmd) {
311        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
312        DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId());
313        if (log.isDebugEnabled()) {
314            log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName,
315                    DOC_TYPE, cmd.getTargetDocumentId()));
316        }
317        request.execute().actionGet();
318    }
319
320    void processDeleteCommandRecursive(IndexingCommand cmd) {
321        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
322        // we don't want to rely on target document because the document can be
323        // already removed
324        String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId());
325        if (docPath == null) {
326            if (!Framework.isTestModeSet()) {
327                log.warn("Trying to delete a non existing doc: " + cmd.toString());
328            }
329            return;
330        }
331        // Refresh index before bulk delete
332        esa.getClient().admin().indices().prepareRefresh(indexName).get();
333
334        // Run the scroll query
335        QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(CHILDREN_FIELD, docPath));
336        TimeValue keepAlive = TimeValue.timeValueMinutes(1);
337        SearchRequestBuilder request = esa.getClient()
338                                          .prepareSearch(indexName)
339                                          .setTypes(DOC_TYPE)
340                                          .setScroll(keepAlive)
341                                          .setSize(100)
342                                          .setFetchSource(false)
343                                          .setQuery(query);
344        if (log.isDebugEnabled()) {
345            log.debug(String.format(
346                    "Search with scroll request: curl -XGET 'http://localhost:9200/%s/%s/_search?scroll=%s' -d '%s'",
347                    indexName, DOC_TYPE, keepAlive, query.toString()));
348        }
349        for (SearchResponse response = request.execute().actionGet(); //
350        response.getHits().getHits().length > 0; //
351        response = runNextScroll(response, keepAlive)) {
352
353            // Build bulk delete request
354            BulkRequestBuilder bulkBuilder = esa.getClient().prepareBulk();
355            for (SearchHit hit : response.getHits().getHits()) {
356                bulkBuilder.add(esa.getClient().prepareDelete(hit.getIndex(), hit.getType(), hit.getId()));
357            }
358            if (log.isDebugEnabled()) {
359                log.debug(String.format("Bulk delete request on %s elements", bulkBuilder.numberOfActions()));
360            }
361            // Run bulk delete request
362            bulkBuilder.execute().actionGet();
363        }
364    }
365
366    SearchResponse runNextScroll(SearchResponse response, TimeValue keepAlive) {
367        if (log.isDebugEnabled()) {
368            log.debug(String.format(
369                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
370                    keepAlive, response.getScrollId()));
371        }
372        return esa.getClient().prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
373    }
374
375    /**
376     * Return the ecm:path of an ES document or null if not found.
377     */
378    String getPathOfDocFromEs(String repository, String docId) {
379        String indexName = esa.getIndexNameForRepository(repository);
380        GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD);
381        if (log.isDebugEnabled()) {
382            log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", indexName,
383                    DOC_TYPE, docId, PATH_FIELD));
384        }
385        GetResponse ret = getRequest.execute().actionGet();
386        if (!ret.isExists() || ret.getField(PATH_FIELD) == null) {
387            // doc not found
388            return null;
389        }
390        return ret.getField(PATH_FIELD).getValue().toString();
391    }
392
393    /**
394     * Return indexing request or null if the doc does not exists anymore.
395     *
396     * @throws java.lang.IllegalStateException if the command is not attached to a session
397     */
398    IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) {
399        DocumentModel doc = cmd.getTargetDocument();
400        if (doc == null) {
401            return null;
402        }
403        try {
404            JsonFactory factory = new JsonFactory();
405            OutputStream out = new BytesStreamOutput();
406            JsonGenerator jsonGen = factory.createJsonGenerator(out);
407            jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null);
408            IndexRequestBuilder ret = esa.getClient()
409                                         .prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE,
410                                                 cmd.getTargetDocumentId())
411                                         .setSource(jsonBuilder(out));
412            if (useExternalVersion && cmd.getOrder() > 0) {
413                ret.setVersionType(VersionType.EXTERNAL).setVersion(cmd.getOrder());
414            }
415            return ret;
416        } catch (IOException e) {
417            throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e);
418        }
419    }
420
421}