001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import com.codahale.metrics.MetricRegistry;
024import com.codahale.metrics.SharedMetricRegistries;
025import com.codahale.metrics.Timer;
026import com.codahale.metrics.Timer.Context;
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.codehaus.jackson.JsonFactory;
030import org.codehaus.jackson.JsonGenerator;
031import org.elasticsearch.action.bulk.BulkItemResponse;
032import org.elasticsearch.action.bulk.BulkRequestBuilder;
033import org.elasticsearch.action.bulk.BulkResponse;
034import org.elasticsearch.action.delete.DeleteRequestBuilder;
035import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
036import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
037import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
038import org.elasticsearch.action.get.GetRequestBuilder;
039import org.elasticsearch.action.get.GetResponse;
040import org.elasticsearch.action.index.IndexRequestBuilder;
041import org.elasticsearch.common.xcontent.XContentBuilder;
042import org.elasticsearch.index.VersionType;
043import org.elasticsearch.index.engine.VersionConflictEngineException;
044import org.elasticsearch.index.query.FilterBuilders;
045import org.elasticsearch.index.query.QueryBuilder;
046import org.elasticsearch.index.query.QueryBuilders;
047import org.elasticsearch.rest.RestStatus;
048import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
049import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
050import org.nuxeo.ecm.core.api.DocumentModel;
051import org.nuxeo.ecm.core.api.DocumentNotFoundException;
052import org.nuxeo.ecm.core.api.NuxeoException;
053import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
054import org.nuxeo.elasticsearch.commands.IndexingCommand;
055import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
056import org.nuxeo.runtime.api.Framework;
057import org.nuxeo.runtime.metrics.MetricsService;
058
059import java.io.IOException;
060import java.util.HashSet;
061import java.util.List;
062import java.util.Set;
063
064import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
065import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD;
066import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
067import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD;
068
069/**
070 * @since 6.0
071 */
072public class ElasticSearchIndexingImpl implements ElasticSearchIndexing {
073    private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class);
074
075    private final ElasticSearchAdminImpl esa;
076
077    private final Timer deleteTimer;
078
079    private final Timer indexTimer;
080
081    private final Timer bulkIndexTimer;
082
083    private final boolean useExternalVersion;
084
085    private JsonESDocumentWriter jsonESDocumentWriter;
086
087    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) {
088        this.esa = esa;
089        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
090        indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
091        deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete"));
092        bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
093        this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer
094        this.useExternalVersion = esa.useExternalVersion();
095    }
096
097    /**
098     * @since 7.2
099     */
100    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) {
101        this(esa);
102        this.jsonESDocumentWriter = jsonESDocumentWriter;
103    }
104
105    @Override
106    public void runIndexingWorker(List<IndexingCommand> cmds) {
107        throw new UnsupportedOperationException("Not implemented");
108    }
109
110    @Override
111    public void runReindexingWorker(String repositoryName, String nxql) {
112        throw new UnsupportedOperationException("Not implemented");
113    }
114
115    @Override
116    public void indexNonRecursive(List<IndexingCommand> cmds) {
117        int nbCommands = cmds.size();
118        if (nbCommands == 1) {
119            indexNonRecursive(cmds.get(0));
120            return;
121        }
122        // simulate long indexing
123        // try {Thread.sleep(1000);} catch (InterruptedException e) { }
124
125        processBulkDeleteCommands(cmds);
126        Context stopWatch = bulkIndexTimer.time();
127        try {
128            processBulkIndexCommands(cmds);
129        } finally {
130            stopWatch.stop();
131        }
132        esa.totalCommandProcessed.addAndGet(nbCommands);
133        refreshIfNeeded(cmds);
134    }
135
136    void processBulkDeleteCommands(List<IndexingCommand> cmds) {
137        // Can be optimized with a single delete by query
138        for (IndexingCommand cmd : cmds) {
139            if (cmd.getType() == Type.DELETE) {
140                Context stopWatch = deleteTimer.time();
141                try {
142                    processDeleteCommand(cmd);
143                } finally {
144                    stopWatch.stop();
145                }
146            }
147        }
148    }
149
150    void processBulkIndexCommands(List<IndexingCommand> cmds) {
151        BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk();
152        Set<String> docIds = new HashSet<>(cmds.size());
153        for (IndexingCommand cmd : cmds) {
154            if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) {
155                continue;
156            }
157            if (! docIds.add(cmd.getTargetDocumentId()) ) {
158                // do not submit the same doc 2 times
159                continue;
160            }
161            try {
162                IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd);
163                if (idxRequest != null) {
164                    bulkRequest.add(idxRequest);
165                }
166            } catch (ConcurrentUpdateException e) {
167                throw e; // bubble up, usually until AbstractWork catches it and maybe retries
168            } catch (DocumentNotFoundException e) {
169                log.info("Ignore indexing command in bulk, doc does not exists anymore: " + cmd);
170            } catch (IllegalArgumentException e) {
171                log.error("Ignore indexing command in bulk, fail to create request: " + cmd, e);
172            }
173        }
174        if (bulkRequest.numberOfActions() > 0) {
175            if (log.isDebugEnabled()) {
176                log.debug(String.format(
177                        "Index %d docs in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'",
178                        bulkRequest.numberOfActions(), bulkRequest.request().requests().toString()));
179            }
180            BulkResponse response = bulkRequest.execute().actionGet();
181            if (response.hasFailures()) {
182                logBulkFailure(response);
183            }
184        }
185    }
186
187    protected void logBulkFailure(BulkResponse response) {
188        boolean isError = false;
189        StringBuilder sb = new StringBuilder();
190        sb.append("Ignore indexing of some docs more recent versions has already been indexed");
191        for (BulkItemResponse item : response.getItems()) {
192            if (item.isFailed()) {
193                if (item.getFailure().getStatus() == RestStatus.CONFLICT) {
194                    sb.append("\n  ").append(item.getFailureMessage());
195                } else {
196                    isError = true;
197                }
198            }
199        }
200        if (isError) {
201            log.error(response.buildFailureMessage());
202        } else {
203            log.info(sb);
204        }
205    }
206
207    protected void refreshIfNeeded(List<IndexingCommand> cmds) {
208        for (IndexingCommand cmd : cmds) {
209            if (refreshIfNeeded(cmd))
210                return;
211        }
212    }
213
214    private boolean refreshIfNeeded(IndexingCommand cmd) {
215        if (cmd.isSync()) {
216            esa.refresh();
217            return true;
218        }
219        return false;
220    }
221
222    @Override
223    public void indexNonRecursive(IndexingCommand cmd) {
224        Type type = cmd.getType();
225        if (type == Type.UPDATE_DIRECT_CHILDREN) {
226            // the parent don't need to be indexed
227            return;
228        }
229        Context stopWatch = null;
230        try {
231            if (type == Type.DELETE) {
232                stopWatch = deleteTimer.time();
233                processDeleteCommand(cmd);
234            } else {
235                stopWatch = indexTimer.time();
236                processIndexCommand(cmd);
237            }
238            refreshIfNeeded(cmd);
239        } finally {
240            if (stopWatch != null) {
241                stopWatch.stop();
242            }
243            esa.totalCommandProcessed.incrementAndGet();
244        }
245    }
246
247    void processIndexCommand(IndexingCommand cmd) {
248        IndexRequestBuilder request;
249        try {
250            request = buildEsIndexingRequest(cmd);
251        } catch (DocumentNotFoundException e) {
252            request = null;
253        } catch (IllegalStateException e) {
254            log.error("Fail to create request for indexing command: " + cmd, e);
255            return;
256        }
257        if (request == null) {
258            log.info("Cancel indexing command because target document does not exists anymore: " + cmd);
259            return;
260        }
261        if (log.isDebugEnabled()) {
262            log.debug(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'",
263                    esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(),
264                    request.request().toString()));
265        }
266        try {
267            request.execute().actionGet();
268        } catch (VersionConflictEngineException e) {
269            log.info("Ignore indexing of doc " + cmd.getTargetDocumentId() +
270                    " a more recent version has already been indexed: " + e.getMessage());
271        }
272    }
273
274    void processDeleteCommand(IndexingCommand cmd) {
275        if (cmd.isRecurse()) {
276            processDeleteCommandRecursive(cmd);
277        } else {
278            processDeleteCommandNonRecursive(cmd);
279        }
280    }
281
282    void processDeleteCommandNonRecursive(IndexingCommand cmd) {
283        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
284        DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId());
285        if (log.isDebugEnabled()) {
286            log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName,
287                    DOC_TYPE, cmd.getTargetDocumentId()));
288        }
289        request.execute().actionGet();
290    }
291
292    void processDeleteCommandRecursive(IndexingCommand cmd) {
293        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
294        // we don't want to rely on target document because the document can be
295        // already removed
296        String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId());
297        if (docPath == null) {
298            if (!Framework.isTestModeSet()) {
299                log.warn("Trying to delete a non existing doc: " + cmd.toString());
300            }
301            return;
302        }
303        QueryBuilder query = QueryBuilders.constantScoreQuery(FilterBuilders.termFilter(CHILDREN_FIELD, docPath));
304        DeleteByQueryRequestBuilder deleteRequest = esa.getClient().prepareDeleteByQuery(indexName).setTypes(DOC_TYPE).setQuery(
305                query);
306        if (log.isDebugEnabled()) {
307            log.debug(String.format(
308                    "Delete byQuery request: curl -XDELETE 'http://localhost:9200/%s/%s/_query' -d '%s'", indexName,
309                    DOC_TYPE, query.toString()));
310        }
311        DeleteByQueryResponse responses = deleteRequest.execute().actionGet();
312        for (IndexDeleteByQueryResponse response : responses) {
313            // there is no way to trace how many docs are removed
314            if (response.getFailedShards() > 0) {
315                log.error(String.format("Delete byQuery fails on shard: %d out of %d", response.getFailedShards(),
316                        response.getTotalShards()));
317            }
318        }
319    }
320
321    /**
322     * Return the ecm:path of an ES document or null if not found.
323     */
324    String getPathOfDocFromEs(String repository, String docId) {
325        String indexName = esa.getIndexNameForRepository(repository);
326        GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD);
327        if (log.isDebugEnabled()) {
328            log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'",
329                    indexName, DOC_TYPE, docId, PATH_FIELD));
330        }
331        GetResponse ret = getRequest.execute().actionGet();
332        if (!ret.isExists() || ret.getField(PATH_FIELD) == null) {
333            // doc not found
334            return null;
335        }
336        return ret.getField(PATH_FIELD).getValue().toString();
337    }
338
339    /**
340     * Return indexing request or null if the doc does not exists anymore.
341     *
342     * @throws java.lang.IllegalStateException if the command is not attached to a session
343     */
344    IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) {
345        DocumentModel doc = cmd.getTargetDocument();
346        if (doc == null) {
347            return null;
348        }
349        try {
350            JsonFactory factory = new JsonFactory();
351            XContentBuilder builder = jsonBuilder();
352            JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream());
353            jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null);
354            IndexRequestBuilder ret = esa.getClient().prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE,
355                    cmd.getTargetDocumentId()).setSource(builder);
356            if (useExternalVersion && cmd.getOrder() > 0) {
357                ret.setVersionType(VersionType.EXTERNAL).setVersion(cmd.getOrder());
358            }
359            return ret;
360        } catch (IOException e) {
361            throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e);
362        }
363    }
364
365}