001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD;
025import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
026import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEX_BULK_MAX_SIZE_PROPERTY;
027import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD;
028
029import java.io.IOException;
030import java.io.OutputStream;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Set;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.elasticsearch.action.bulk.BulkItemResponse;
038import org.elasticsearch.action.bulk.BulkRequest;
039import org.elasticsearch.action.bulk.BulkResponse;
040import org.elasticsearch.action.delete.DeleteRequest;
041import org.elasticsearch.action.get.GetRequest;
042import org.elasticsearch.action.get.GetResponse;
043import org.elasticsearch.action.index.IndexRequest;
044import org.elasticsearch.action.search.SearchRequest;
045import org.elasticsearch.action.search.SearchResponse;
046import org.elasticsearch.action.search.SearchScrollRequest;
047import org.elasticsearch.common.io.stream.BytesStreamOutput;
048import org.elasticsearch.common.unit.TimeValue;
049import org.elasticsearch.index.VersionType;
050import org.elasticsearch.index.engine.VersionConflictEngineException;
051import org.elasticsearch.index.query.QueryBuilder;
052import org.elasticsearch.index.query.QueryBuilders;
053import org.elasticsearch.rest.RestStatus;
054import org.elasticsearch.search.SearchHit;
055import org.elasticsearch.search.builder.SearchSourceBuilder;
056import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
057import org.nuxeo.common.logging.SequenceTracer;
058import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
059import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
060import org.nuxeo.ecm.core.api.DocumentModel;
061import org.nuxeo.ecm.core.api.DocumentNotFoundException;
062import org.nuxeo.ecm.core.api.NuxeoException;
063import org.nuxeo.ecm.core.api.model.BlobNotFoundException;
064import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
065import org.nuxeo.elasticsearch.commands.IndexingCommand;
066import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
067import org.nuxeo.runtime.api.Framework;
068import org.nuxeo.runtime.metrics.MetricsService;
069
070import com.codahale.metrics.MetricRegistry;
071import com.codahale.metrics.SharedMetricRegistries;
072import com.codahale.metrics.Timer;
073import com.codahale.metrics.Timer.Context;
074import com.fasterxml.jackson.core.JsonFactory;
075import com.fasterxml.jackson.core.JsonGenerator;
076
077/**
078 * @since 6.0
079 */
080public class ElasticSearchIndexingImpl implements ElasticSearchIndexing {
081    private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class);
082
083    // debug curl line max size
084    private static final int MAX_CURL_LINE = 8 * 1024;
085
086    // send the bulk indexing command when this size is reached, optimal is 5-10m
087    private static final int DEFAULT_MAX_BULK_SIZE = 5 * 1024 * 1024;
088
089    private final ElasticSearchAdminImpl esa;
090
091    private final Timer deleteTimer;
092
093    private final Timer indexTimer;
094
095    private final Timer bulkIndexTimer;
096
097    private final boolean useExternalVersion;
098
099    private JsonESDocumentWriter jsonESDocumentWriter;
100
101    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) {
102        this.esa = esa;
103        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
104        indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
105        deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete"));
106        bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
107        this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer
108        this.useExternalVersion = esa.useExternalVersion();
109    }
110
111    /**
112     * @since 7.2
113     */
114    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) {
115        this(esa);
116        this.jsonESDocumentWriter = jsonESDocumentWriter;
117    }
118
119    @Override
120    public void runIndexingWorker(List<IndexingCommand> cmds) {
121        throw new UnsupportedOperationException("Not implemented");
122    }
123
124    @Override
125    public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) {
126        throw new UnsupportedOperationException("Not implemented");
127    }
128
129    @Override
130    public void reindexRepository(String repositoryName) {
131        throw new UnsupportedOperationException("Not implemented");
132    }
133
134    @Override
135    public void indexNonRecursive(List<IndexingCommand> cmds) {
136        int nbCommands = cmds.size();
137        if (nbCommands == 1) {
138            indexNonRecursive(cmds.get(0));
139            return;
140        }
141        // simulate long indexing
142        // try {Thread.sleep(1000);} catch (InterruptedException e) { }
143
144        processBulkDeleteCommands(cmds);
145        try (Context ignored = bulkIndexTimer.time()) {
146            processBulkIndexCommands(cmds);
147        }
148        esa.totalCommandProcessed.addAndGet(nbCommands);
149        refreshIfNeeded(cmds);
150    }
151
152    void processBulkDeleteCommands(List<IndexingCommand> cmds) {
153        // Can be optimized with a single delete by query
154        for (IndexingCommand cmd : cmds) {
155            if (cmd.getType() == Type.DELETE) {
156                try (Context ignored = deleteTimer.time()) {
157                    processDeleteCommand(cmd);
158                }
159            }
160        }
161    }
162
163    void processBulkIndexCommands(List<IndexingCommand> cmds) {
164        BulkRequest bulkRequest = new BulkRequest();
165        Set<String> docIds = new HashSet<>(cmds.size());
166        int bulkSize = 0;
167        final int maxBulkSize = getMaxBulkSize();
168        for (IndexingCommand cmd : cmds) {
169            if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) {
170                continue;
171            }
172            if (!docIds.add(cmd.getTargetDocumentId())) {
173                // do not submit the same doc 2 times
174                continue;
175            }
176            try {
177                IndexRequest idxRequest = buildEsIndexingRequest(cmd);
178                if (idxRequest != null) {
179                    bulkSize += idxRequest.source().length();
180                    bulkRequest.add(idxRequest);
181                }
182            } catch (BlobNotFoundException be) {
183                log.info("Ignore indexing command in bulk, blob does not exists anymore: " + cmd);
184            } catch (ConcurrentUpdateException e) {
185                throw e; // bubble up, usually until AbstractWork catches it and maybe retries
186            } catch (DocumentNotFoundException e) {
187                log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd);
188            } catch (IllegalArgumentException e) {
189                log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e);
190            }
191            if (bulkSize > maxBulkSize) {
192                log.warn("Max bulk size reached " + bulkSize + ", sending bulk command");
193                sendBulkCommand(bulkRequest, bulkSize);
194                bulkRequest = new BulkRequest();
195                bulkSize = 0;
196            }
197        }
198        sendBulkCommand(bulkRequest, bulkSize);
199    }
200
201    int getMaxBulkSize() {
202        String value = Framework.getProperty(INDEX_BULK_MAX_SIZE_PROPERTY, String.valueOf(DEFAULT_MAX_BULK_SIZE));
203        return Integer.parseInt(value);
204    }
205
206    void sendBulkCommand(BulkRequest bulkRequest, int bulkSize) {
207        if (bulkRequest.numberOfActions() > 0) {
208            if (log.isDebugEnabled()) {
209                logDebugMessageTruncated(String.format(
210                        "Index %d docs (%d bytes) in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'",
211                        bulkRequest.numberOfActions(), bulkSize, bulkRequest.requests().toString()), MAX_CURL_LINE);
212            }
213            BulkResponse response = esa.getClient().bulk(bulkRequest);
214            if (response.hasFailures()) {
215                logBulkFailure(response);
216            }
217        }
218    }
219
220    void logBulkFailure(BulkResponse response) {
221        boolean isError = false;
222        StringBuilder sb = new StringBuilder();
223        sb.append("Ignore indexing of some docs more recent versions has already been indexed");
224        for (BulkItemResponse item : response.getItems()) {
225            if (item.isFailed()) {
226                if (item.getFailure().getStatus() == RestStatus.CONFLICT) {
227                    sb.append("\n  ").append(item.getFailureMessage());
228                } else {
229                    isError = true;
230                }
231            }
232        }
233        if (isError) {
234            log.error(response.buildFailureMessage());
235        } else {
236            log.debug(sb);
237        }
238    }
239
240    void refreshIfNeeded(List<IndexingCommand> cmds) {
241        for (IndexingCommand cmd : cmds) {
242            if (refreshIfNeeded(cmd))
243                return;
244        }
245    }
246
247    boolean refreshIfNeeded(IndexingCommand cmd) {
248        if (cmd.isSync()) {
249            esa.refresh();
250            return true;
251        }
252        return false;
253    }
254
255    @Override
256    public void indexNonRecursive(IndexingCommand cmd) {
257        Type type = cmd.getType();
258        if (type == Type.UPDATE_DIRECT_CHILDREN) {
259            // the parent don't need to be indexed
260            return;
261        }
262        if (type == Type.DELETE) {
263            try (Context ignored = deleteTimer.time()) {
264                processDeleteCommand(cmd);
265            }
266        } else {
267            try (Context ignored = indexTimer.time()) {
268                processIndexCommand(cmd);
269            }
270        }
271        refreshIfNeeded(cmd);
272        esa.totalCommandProcessed.incrementAndGet();
273    }
274
275    void processIndexCommand(IndexingCommand cmd) {
276        IndexRequest request;
277        try {
278            request = buildEsIndexingRequest(cmd);
279        } catch (BlobNotFoundException pe) {
280            request = null;
281        } catch (DocumentNotFoundException e) {
282            request = null;
283        } catch (IllegalStateException e) {
284            log.error("Fail to create request for indexing command: " + cmd, e);
285            return;
286        }
287        if (request == null) {
288            log.info("Cancel indexing command because target document does not exists anymore: " + cmd);
289            return;
290        }
291        if (log.isDebugEnabled()) {
292            logDebugMessageTruncated(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'",
293                    getWriteIndexForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(),
294                    request.toString()), MAX_CURL_LINE);
295        }
296        try {
297            esa.getClient().index(request);
298        } catch (ConcurrentUpdateException e) {
299            SequenceTracer.addNote("Ignore indexing of doc " + cmd.getTargetDocumentId());
300            log.info("Ignore indexing of doc " + cmd.getTargetDocumentId()
301                    + " a more recent version has already been indexed: " + e.getMessage());
302        }
303    }
304
305    void logDebugMessageTruncated(String msg, int maxSize) {
306        if (log.isTraceEnabled() || msg.length() < maxSize) {
307            // in trace mode we output the full message
308            log.debug(msg);
309        } else {
310            log.debug(msg.substring(0, maxSize) + "...");
311        }
312    }
313
314    void processDeleteCommand(IndexingCommand cmd) {
315        if (cmd.isRecurse()) {
316            processDeleteCommandRecursive(cmd);
317        } else {
318            processDeleteCommandNonRecursive(cmd);
319        }
320    }
321
322    void processDeleteCommandNonRecursive(IndexingCommand cmd) {
323        String indexName = getWriteIndexForRepository(cmd.getRepositoryName());
324        DeleteRequest request = new DeleteRequest(indexName, DOC_TYPE, cmd.getTargetDocumentId());
325        if (log.isDebugEnabled()) {
326            log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName,
327                    DOC_TYPE, cmd.getTargetDocumentId()));
328        }
329        esa.getClient().delete(request);
330    }
331
332    void processDeleteCommandRecursive(IndexingCommand cmd) {
333        String indexName = getWriteIndexForRepository(cmd.getRepositoryName());
334        // we don't want to rely on target document because the document can be
335        // already removed
336        String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId());
337        if (docPath == null) {
338            if (!Framework.isTestModeSet()) {
339                log.warn("Trying to delete a non existing doc: " + cmd.toString());
340            }
341            return;
342        }
343        // Refresh index before bulk delete
344        esa.getClient().refresh(indexName);
345
346        // Run the scroll query
347        QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(CHILDREN_FIELD, docPath));
348        TimeValue keepAlive = TimeValue.timeValueMinutes(1);
349        SearchSourceBuilder search = new SearchSourceBuilder().size(100).query(query).fetchSource(false);
350        SearchRequest request = new SearchRequest(indexName).scroll(keepAlive).source(search);
351        if (log.isDebugEnabled()) {
352            log.debug(String.format(
353                    "Search with scroll request: curl -XGET 'http://localhost:9200/%s/%s/_search?scroll=%s' -d '%s'",
354                    indexName, DOC_TYPE, keepAlive, query.toString()));
355        }
356        for (SearchResponse response = esa.getClient().search(request); //
357                response.getHits().getHits().length > 0; //
358                response = runNextScroll(response, keepAlive)) {
359
360            // Build bulk delete request
361            BulkRequest bulkRequest = new BulkRequest();
362            for (SearchHit hit : response.getHits().getHits()) {
363                bulkRequest.add(new DeleteRequest(hit.getIndex(), hit.getType(), hit.getId()));
364            }
365            if (log.isDebugEnabled()) {
366                log.debug(String.format("Bulk delete request on %s elements", bulkRequest.numberOfActions()));
367            }
368            // Run bulk delete request
369            esa.getClient().bulk(bulkRequest);
370        }
371    }
372
373    SearchResponse runNextScroll(SearchResponse response, TimeValue keepAlive) {
374        if (log.isDebugEnabled()) {
375            log.debug(String.format(
376                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
377                    keepAlive, response.getScrollId()));
378        }
379        SearchScrollRequest request = new SearchScrollRequest(response.getScrollId()).scroll(keepAlive);
380        return esa.getClient().searchScroll(request);
381    }
382
383    /**
384     * Return the ecm:path of an ES document or null if not found.
385     */
386    String getPathOfDocFromEs(String repository, String docId) {
387        String indexName = getWriteIndexForRepository(repository);
388        GetRequest request = new GetRequest(indexName, DOC_TYPE, docId).fetchSourceContext(
389                new FetchSourceContext(true, new String[] { PATH_FIELD }, null));
390        if (log.isDebugEnabled()) {
391            log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'", indexName,
392                    DOC_TYPE, docId, PATH_FIELD));
393        }
394        GetResponse ret = esa.getClient().get(request);
395        if (!ret.isExists() || ret.getSource() == null || ret.getSource().get(PATH_FIELD) == null) {
396            // doc not found
397            return null;
398        }
399        return ret.getSource().get(PATH_FIELD).toString();
400    }
401
402    /**
403     * Return indexing request or null if the doc does not exists anymore.
404     *
405     * @throws java.lang.IllegalStateException if the command is not attached to a session
406     */
407    IndexRequest buildEsIndexingRequest(IndexingCommand cmd) {
408        DocumentModel doc = cmd.getTargetDocument();
409        if (doc == null) {
410            return null;
411        }
412        try {
413            JsonFactory factory = new JsonFactory();
414            OutputStream out = new BytesStreamOutput();
415            JsonGenerator jsonGen = factory.createGenerator(out);
416            jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null);
417            IndexRequest request = new IndexRequest(getWriteIndexForRepository(cmd.getRepositoryName()), DOC_TYPE,
418                    cmd.getTargetDocumentId()).source(jsonBuilder(out));
419            if (useExternalVersion && cmd.getOrder() > 0) {
420                request.versionType(VersionType.EXTERNAL).version(cmd.getOrder());
421            }
422            return request;
423        } catch (IOException e) {
424            throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e);
425        }
426    }
427
428    protected String getWriteIndexForRepository(String repository) {
429        return esa.getWriteIndexName(esa.getIndexNameForRepository(repository));
430    }
431
432}