001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD;
025import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
026import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEX_BULK_MAX_SIZE_PROPERTY;
027import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD;
028
029import java.io.IOException;
030import java.io.OutputStream;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Set;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.codehaus.jackson.JsonFactory;
038import org.codehaus.jackson.JsonGenerator;
039import org.elasticsearch.action.bulk.BulkItemResponse;
040import org.elasticsearch.action.bulk.BulkRequest;
041import org.elasticsearch.action.bulk.BulkResponse;
042import org.elasticsearch.action.delete.DeleteRequest;
043import org.elasticsearch.action.get.GetRequest;
044import org.elasticsearch.action.get.GetResponse;
045import org.elasticsearch.action.index.IndexRequest;
046import org.elasticsearch.action.search.SearchRequest;
047import org.elasticsearch.action.search.SearchResponse;
048import org.elasticsearch.action.search.SearchScrollRequest;
049import org.elasticsearch.common.io.stream.BytesStreamOutput;
050import org.elasticsearch.common.unit.TimeValue;
051import org.elasticsearch.index.VersionType;
052import org.elasticsearch.index.engine.VersionConflictEngineException;
053import org.elasticsearch.index.query.QueryBuilder;
054import org.elasticsearch.index.query.QueryBuilders;
055import org.elasticsearch.rest.RestStatus;
056import org.elasticsearch.search.SearchHit;
057import org.elasticsearch.search.builder.SearchSourceBuilder;
058import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
059import org.nuxeo.common.logging.SequenceTracer;
060import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
061import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
062import org.nuxeo.ecm.core.api.DocumentModel;
063import org.nuxeo.ecm.core.api.DocumentNotFoundException;
064import org.nuxeo.ecm.core.api.NuxeoException;
065import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
066import org.nuxeo.elasticsearch.commands.IndexingCommand;
067import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
068import org.nuxeo.runtime.api.Framework;
069import org.nuxeo.runtime.metrics.MetricsService;
070
071import com.codahale.metrics.MetricRegistry;
072import com.codahale.metrics.SharedMetricRegistries;
073import com.codahale.metrics.Timer;
074import com.codahale.metrics.Timer.Context;
075
076/**
077 * @since 6.0
078 */
079public class ElasticSearchIndexingImpl implements ElasticSearchIndexing {
080    private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class);
081
082    // debug curl line max size
083    private static final int MAX_CURL_LINE = 8 * 1024;
084
085    // send the bulk indexing command when this size is reached, optimal is 5-10m
086    private static final int DEFAULT_MAX_BULK_SIZE = 5 * 1024 * 1024;
087
088    private final ElasticSearchAdminImpl esa;
089
090    private final Timer deleteTimer;
091
092    private final Timer indexTimer;
093
094    private final Timer bulkIndexTimer;
095
096    private final boolean useExternalVersion;
097
098    private JsonESDocumentWriter jsonESDocumentWriter;
099
100    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) {
101        this.esa = esa;
102        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
103        indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
104        deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete"));
105        bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
106        this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer
107        this.useExternalVersion = esa.useExternalVersion();
108    }
109
110    /**
111     * @since 7.2
112     */
113    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) {
114        this(esa);
115        this.jsonESDocumentWriter = jsonESDocumentWriter;
116    }
117
118    @Override
119    public void runIndexingWorker(List<IndexingCommand> cmds) {
120        throw new UnsupportedOperationException("Not implemented");
121    }
122
123    @Override
124    public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) {
125        throw new UnsupportedOperationException("Not implemented");
126    }
127
128    @Override
129    public void reindexRepository(String repositoryName) {
130        throw new UnsupportedOperationException("Not implemented");
131    }
132
133    @Override
134    public void indexNonRecursive(List<IndexingCommand> cmds) {
135        int nbCommands = cmds.size();
136        if (nbCommands == 1) {
137            indexNonRecursive(cmds.get(0));
138            return;
139        }
140        // simulate long indexing
141        // try {Thread.sleep(1000);} catch (InterruptedException e) { }
142
143        processBulkDeleteCommands(cmds);
144        try (Context ignored = bulkIndexTimer.time()) {
145            processBulkIndexCommands(cmds);
146        }
147        esa.totalCommandProcessed.addAndGet(nbCommands);
148        refreshIfNeeded(cmds);
149    }
150
151    void processBulkDeleteCommands(List<IndexingCommand> cmds) {
152        // Can be optimized with a single delete by query
153        for (IndexingCommand cmd : cmds) {
154            if (cmd.getType() == Type.DELETE) {
155                try (Context ignored = deleteTimer.time()) {
156                    processDeleteCommand(cmd);
157                }
158            }
159        }
160    }
161
162    void processBulkIndexCommands(List<IndexingCommand> cmds) {
163        BulkRequest bulkRequest = new BulkRequest();
164        Set<String> docIds = new HashSet<>(cmds.size());
165        int bulkSize = 0;
166        final int maxBulkSize = getMaxBulkSize();
167        for (IndexingCommand cmd : cmds) {
168            if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) {
169                continue;
170            }
171            if (!docIds.add(cmd.getTargetDocumentId())) {
172                // do not submit the same doc 2 times
173                continue;
174            }
175            try {
176                IndexRequest idxRequest = buildEsIndexingRequest(cmd);
177                if (idxRequest != null) {
178                    bulkSize += idxRequest.source().length();
179                    bulkRequest.add(idxRequest);
180                }
181            } catch (ConcurrentUpdateException e) {
182                throw e; // bubble up, usually until AbstractWork catches it and maybe retries
183            } catch (DocumentNotFoundException e) {
184                log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd);
185            } catch (IllegalArgumentException e) {
186                log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e);
187            }
188            if (bulkSize > maxBulkSize) {
189                log.warn("Max bulk size reached " + bulkSize + ", sending bulk command");
190                sendBulkCommand(bulkRequest, bulkSize);
191                bulkRequest = new BulkRequest();
192                bulkSize = 0;
193            }
194        }
195        sendBulkCommand(bulkRequest, bulkSize);
196    }
197
198    int getMaxBulkSize() {
199        String value = Framework.getProperty(INDEX_BULK_MAX_SIZE_PROPERTY, String.valueOf(DEFAULT_MAX_BULK_SIZE));
200        return Integer.parseInt(value);
201    }
202
203    void sendBulkCommand(BulkRequest bulkRequest, int bulkSize) {
204        if (bulkRequest.numberOfActions() > 0) {
205            if (log.isDebugEnabled()) {
206                logDebugMessageTruncated(String.format(
207                        "Index %d docs (%d bytes) in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'",
208                        bulkRequest.numberOfActions(), bulkSize, bulkRequest.requests().toString()), MAX_CURL_LINE);
209            }
210            BulkResponse response = esa.getClient().bulk(bulkRequest);
211            if (response.hasFailures()) {
212                logBulkFailure(response);
213            }
214        }
215    }
216
217    void logBulkFailure(BulkResponse response) {
218        boolean isError = false;
219        StringBuilder sb = new StringBuilder();
220        sb.append("Ignore indexing of some docs more recent versions has already been indexed");
221        for (BulkItemResponse item : response.getItems()) {
222            if (item.isFailed()) {
223                if (item.getFailure().getStatus() == RestStatus.CONFLICT) {
224                    sb.append("\n  ").append(item.getFailureMessage());
225                } else {
226                    isError = true;
227                }
228            }
229        }
230        if (isError) {
231            log.error(response.buildFailureMessage());
232        } else {
233            log.debug(sb);
234        }
235    }
236
237    void refreshIfNeeded(List<IndexingCommand> cmds) {
238        for (IndexingCommand cmd : cmds) {
239            if (refreshIfNeeded(cmd))
240                return;
241        }
242    }
243
244    boolean refreshIfNeeded(IndexingCommand cmd) {
245        if (cmd.isSync()) {
246            esa.refresh();
247            return true;
248        }
249        return false;
250    }
251
252    @Override
253    public void indexNonRecursive(IndexingCommand cmd) {
254        Type type = cmd.getType();
255        if (type == Type.UPDATE_DIRECT_CHILDREN) {
256            // the parent don't need to be indexed
257            return;
258        }
259        if (type == Type.DELETE) {
260            try (Context ignored = deleteTimer.time()) {
261                processDeleteCommand(cmd);
262            }
263        } else {
264            try (Context ignored = indexTimer.time()) {
265                processIndexCommand(cmd);
266            }
267        }
268        refreshIfNeeded(cmd);
269        esa.totalCommandProcessed.incrementAndGet();
270    }
271
272    void processIndexCommand(IndexingCommand cmd) {
273        IndexRequest request;
274        try {
275            request = buildEsIndexingRequest(cmd);
276        } catch (DocumentNotFoundException e) {
277            request = null;
278        } catch (IllegalStateException e) {
279            log.error("Fail to create request for indexing command: " + cmd, e);
280            return;
281        }
282        if (request == null) {
283            log.info("Cancel indexing command because target document does not exists anymore: " + cmd);
284            return;
285        }
286        if (log.isDebugEnabled()) {
287            logDebugMessageTruncated(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'",
288                    getWriteIndexForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(),
289                    request.toString()), MAX_CURL_LINE);
290        }
291        try {
292            esa.getClient().index(request);
293        } catch (VersionConflictEngineException e) {
294            SequenceTracer.addNote("Ignore indexing of doc " + cmd.getTargetDocumentId());
295            log.info("Ignore indexing of doc " + cmd.getTargetDocumentId()
296                    + " a more recent version has already been indexed: " + e.getMessage());
297        }
298    }
299
300    void logDebugMessageTruncated(String msg, int maxSize) {
301        if (log.isTraceEnabled() || msg.length() < maxSize) {
302            // in trace mode we output the full message
303            log.debug(msg);
304        } else {
305            log.debug(msg.substring(0, maxSize) + "...");
306        }
307    }
308
309    void processDeleteCommand(IndexingCommand cmd) {
310        if (cmd.isRecurse()) {
311            processDeleteCommandRecursive(cmd);
312        } else {
313            processDeleteCommandNonRecursive(cmd);
314        }
315    }
316
317    void processDeleteCommandNonRecursive(IndexingCommand cmd) {
318        String indexName = getWriteIndexForRepository(cmd.getRepositoryName());
319        DeleteRequest request = new DeleteRequest(indexName, DOC_TYPE, cmd.getTargetDocumentId());
320        if (log.isDebugEnabled()) {
321            log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName,
322                    DOC_TYPE, cmd.getTargetDocumentId()));
323        }
324        esa.getClient().delete(request);
325    }
326
327    void processDeleteCommandRecursive(IndexingCommand cmd) {
328        String indexName = getWriteIndexForRepository(cmd.getRepositoryName());
329        // we don't want to rely on target document because the document can be
330        // already removed
331        String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId());
332        if (docPath == null) {
333            if (!Framework.isTestModeSet()) {
334                log.warn("Trying to delete a non existing doc: " + cmd.toString());
335            }
336            return;
337        }
338        // Refresh index before bulk delete
339        esa.getClient().refresh(indexName);
340
341        // Run the scroll query
342        QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(CHILDREN_FIELD, docPath));
343        TimeValue keepAlive = TimeValue.timeValueMinutes(1);
344        SearchSourceBuilder search = new SearchSourceBuilder().size(100).query(query).fetchSource(false);
345        SearchRequest request = new SearchRequest(indexName).scroll(keepAlive).source(search);
346        if (log.isDebugEnabled()) {
347            log.debug(String.format(
348                    "Search with scroll request: curl -XGET 'http://localhost:9200/%s/%s/_search?scroll=%s' -d '%s'",
349                    indexName, DOC_TYPE, keepAlive, query.toString()));
350        }
351        for (SearchResponse response = esa.getClient().search(request); //
352                response.getHits().getHits().length > 0; //
353                response = runNextScroll(response, keepAlive)) {
354
355            // Build bulk delete request
356            BulkRequest bulkRequest = new BulkRequest();
357            for (SearchHit hit : response.getHits().getHits()) {
358                bulkRequest.add(new DeleteRequest(hit.getIndex(), hit.getType(), hit.getId()));
359            }
360            if (log.isDebugEnabled()) {
361                log.debug(String.format("Bulk delete request on %s elements", bulkRequest.numberOfActions()));
362            }
363            // Run bulk delete request
364            esa.getClient().bulk(bulkRequest);
365        }
366    }
367
368    SearchResponse runNextScroll(SearchResponse response, TimeValue keepAlive) {
369        if (log.isDebugEnabled()) {
370            log.debug(String.format(
371                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
372                    keepAlive, response.getScrollId()));
373        }
374        SearchScrollRequest request = new SearchScrollRequest(response.getScrollId()).scroll(keepAlive);
375        return esa.getClient().searchScroll(request);
376    }
377
378    /**
379     * Return the ecm:path of an ES document or null if not found.
380     */
381    String getPathOfDocFromEs(String repository, String docId) {
382        String indexName = getWriteIndexForRepository(repository);
383        GetRequest request = new GetRequest(indexName, DOC_TYPE, docId).fetchSourceContext(
384                new FetchSourceContext(true, new String[] { PATH_FIELD }, null));
385        if (log.isDebugEnabled()) {
386            log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", indexName,
387                    DOC_TYPE, docId, PATH_FIELD));
388        }
389        GetResponse ret = esa.getClient().get(request);
390        if (!ret.isExists() || ret.getSource() == null || ret.getSource().get(PATH_FIELD) == null) {
391            // doc not found
392            return null;
393        }
394        return ret.getSource().get(PATH_FIELD).toString();
395    }
396
397    /**
398     * Return indexing request or null if the doc does not exists anymore.
399     *
400     * @throws java.lang.IllegalStateException if the command is not attached to a session
401     */
402    IndexRequest buildEsIndexingRequest(IndexingCommand cmd) {
403        DocumentModel doc = cmd.getTargetDocument();
404        if (doc == null) {
405            return null;
406        }
407        try {
408            JsonFactory factory = new JsonFactory();
409            OutputStream out = new BytesStreamOutput();
410            JsonGenerator jsonGen = factory.createJsonGenerator(out);
411            jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null);
412            IndexRequest request = new IndexRequest(getWriteIndexForRepository(cmd.getRepositoryName()), DOC_TYPE,
413                    cmd.getTargetDocumentId()).source(jsonBuilder(out));
414            if (useExternalVersion && cmd.getOrder() > 0) {
415                request.versionType(VersionType.EXTERNAL).version(cmd.getOrder());
416            }
417            return request;
418        } catch (IOException e) {
419            throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e);
420        }
421    }
422
423    protected String getWriteIndexForRepository(String repository) {
424        return esa.getWriteIndexName(esa.getIndexNameForRepository(repository));
425    }
426
427}