001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import com.codahale.metrics.MetricRegistry;
024import com.codahale.metrics.SharedMetricRegistries;
025import com.codahale.metrics.Timer;
026import com.codahale.metrics.Timer.Context;
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.codehaus.jackson.JsonFactory;
030import org.codehaus.jackson.JsonGenerator;
031import org.elasticsearch.action.bulk.BulkRequestBuilder;
032import org.elasticsearch.action.bulk.BulkResponse;
033import org.elasticsearch.action.delete.DeleteRequestBuilder;
034import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
035import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
036import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
037import org.elasticsearch.action.get.GetRequestBuilder;
038import org.elasticsearch.action.get.GetResponse;
039import org.elasticsearch.action.index.IndexRequestBuilder;
040import org.elasticsearch.common.xcontent.XContentBuilder;
041import org.elasticsearch.index.query.FilterBuilders;
042import org.elasticsearch.index.query.QueryBuilder;
043import org.elasticsearch.index.query.QueryBuilders;
044import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
045import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
046import org.nuxeo.ecm.core.api.DocumentModel;
047import org.nuxeo.ecm.core.api.DocumentNotFoundException;
048import org.nuxeo.ecm.core.api.NuxeoException;
049import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
050import org.nuxeo.elasticsearch.commands.IndexingCommand;
051import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
052import org.nuxeo.runtime.api.Framework;
053import org.nuxeo.runtime.metrics.MetricsService;
054
055import java.io.IOException;
056import java.util.List;
057
058import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
059import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD;
060import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
061import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD;
062
063/**
064 * @since 6.0
065 */
066public class ElasticSearchIndexingImpl implements ElasticSearchIndexing {
067    private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class);
068
069    private final ElasticSearchAdminImpl esa;
070
071    private final Timer deleteTimer;
072
073    private final Timer indexTimer;
074
075    private final Timer bulkIndexTimer;
076
077    private JsonESDocumentWriter jsonESDocumentWriter;
078
079    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) {
080        this.esa = esa;
081        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
082        indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
083        deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete"));
084        bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
085        this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer
086    }
087
088    /**
089     * @since 7.2
090     */
091    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) {
092        this(esa);
093        this.jsonESDocumentWriter = jsonESDocumentWriter;
094    }
095
096    @Override
097    public void runIndexingWorker(List<IndexingCommand> cmds) {
098        throw new UnsupportedOperationException("Not implemented");
099    }
100
101    @Override
102    public void runReindexingWorker(String repositoryName, String nxql) {
103        throw new UnsupportedOperationException("Not implemented");
104    }
105
106    @Override
107    public void indexNonRecursive(List<IndexingCommand> cmds) {
108        int nbCommands = cmds.size();
109        if (nbCommands == 1) {
110            indexNonRecursive(cmds.get(0));
111            return;
112        }
113        // simulate long indexing
114        // try {Thread.sleep(1000);} catch (InterruptedException e) { }
115
116        processBulkDeleteCommands(cmds);
117        Context stopWatch = bulkIndexTimer.time();
118        try {
119            processBulkIndexCommands(cmds);
120        } finally {
121            stopWatch.stop();
122        }
123        esa.totalCommandProcessed.addAndGet(nbCommands);
124        refreshIfNeeded(cmds);
125    }
126
127    void processBulkDeleteCommands(List<IndexingCommand> cmds) {
128        // Can be optimized with a single delete by query
129        for (IndexingCommand cmd : cmds) {
130            if (cmd.getType() == Type.DELETE) {
131                Context stopWatch = deleteTimer.time();
132                try {
133                    processDeleteCommand(cmd);
134                } finally {
135                    stopWatch.stop();
136                }
137            }
138        }
139    }
140
141    void processBulkIndexCommands(List<IndexingCommand> cmds) {
142        BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk();
143        for (IndexingCommand cmd : cmds) {
144            if (cmd.getType() == Type.DELETE || cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) {
145                continue;
146            }
147            try {
148                IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd);
149                if (idxRequest != null) {
150                    bulkRequest.add(idxRequest);
151                }
152            } catch (ConcurrentUpdateException e) {
153                throw e; // bubble up, usually until AbstractWork catches it and maybe retries
154            } catch (DocumentNotFoundException e) {
155                log.info("Skip indexing command to bulk, doc does not exists anymore: " + cmd);
156            } catch (IllegalArgumentException e) {
157                log.error("Skip indexing command to bulk, fail to create request: " + cmd, e);
158            }
159        }
160        if (bulkRequest.numberOfActions() > 0) {
161            if (log.isDebugEnabled()) {
162                log.debug(String.format(
163                        "Index %d docs in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'",
164                        bulkRequest.numberOfActions(), bulkRequest.request().requests().toString()));
165            }
166            BulkResponse response = bulkRequest.execute().actionGet();
167            if (response.hasFailures()) {
168                log.error(response.buildFailureMessage());
169            }
170        }
171    }
172
173    protected void refreshIfNeeded(List<IndexingCommand> cmds) {
174        for (IndexingCommand cmd : cmds) {
175            if (refreshIfNeeded(cmd))
176                return;
177        }
178    }
179
180    private boolean refreshIfNeeded(IndexingCommand cmd) {
181        if (cmd.isSync()) {
182            esa.refresh();
183            return true;
184        }
185        return false;
186    }
187
188    @Override
189    public void indexNonRecursive(IndexingCommand cmd) {
190        Type type = cmd.getType();
191        if (type == Type.UPDATE_DIRECT_CHILDREN) {
192            // the parent don't need to be indexed
193            return;
194        }
195        Context stopWatch = null;
196        try {
197            if (type == Type.DELETE) {
198                stopWatch = deleteTimer.time();
199                processDeleteCommand(cmd);
200            } else {
201                stopWatch = indexTimer.time();
202                processIndexCommand(cmd);
203            }
204            refreshIfNeeded(cmd);
205        } finally {
206            if (stopWatch != null) {
207                stopWatch.stop();
208            }
209            esa.totalCommandProcessed.incrementAndGet();
210        }
211    }
212
213    void processIndexCommand(IndexingCommand cmd) {
214        IndexRequestBuilder request;
215        try {
216            request = buildEsIndexingRequest(cmd);
217        } catch (DocumentNotFoundException e) {
218            request = null;
219        } catch (IllegalStateException e) {
220            log.error("Fail to create request for indexing command: " + cmd, e);
221            return;
222        }
223        if (request == null) {
224            log.info("Cancel indexing command because target document does not exists anymore: " + cmd);
225            return;
226        }
227        if (log.isDebugEnabled()) {
228            log.debug(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'",
229                    esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(),
230                    request.request().toString()));
231        }
232        request.execute().actionGet();
233    }
234
235    void processDeleteCommand(IndexingCommand cmd) {
236        if (cmd.isRecurse()) {
237            processDeleteCommandRecursive(cmd);
238        } else {
239            processDeleteCommandNonRecursive(cmd);
240        }
241    }
242
243    void processDeleteCommandNonRecursive(IndexingCommand cmd) {
244        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
245        DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId());
246        if (log.isDebugEnabled()) {
247            log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName,
248                    DOC_TYPE, cmd.getTargetDocumentId()));
249        }
250        request.execute().actionGet();
251    }
252
253    void processDeleteCommandRecursive(IndexingCommand cmd) {
254        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
255        // we don't want to rely on target document because the document can be
256        // already removed
257        String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId());
258        if (docPath == null) {
259            if (!Framework.isTestModeSet()) {
260                log.warn("Trying to delete a non existing doc: " + cmd.toString());
261            }
262            return;
263        }
264        QueryBuilder query = QueryBuilders.constantScoreQuery(FilterBuilders.termFilter(CHILDREN_FIELD, docPath));
265        DeleteByQueryRequestBuilder deleteRequest = esa.getClient().prepareDeleteByQuery(indexName).setTypes(DOC_TYPE).setQuery(
266                query);
267        if (log.isDebugEnabled()) {
268            log.debug(String.format(
269                    "Delete byQuery request: curl -XDELETE 'http://localhost:9200/%s/%s/_query' -d '%s'", indexName,
270                    DOC_TYPE, query.toString()));
271        }
272        DeleteByQueryResponse responses = deleteRequest.execute().actionGet();
273        for (IndexDeleteByQueryResponse response : responses) {
274            // there is no way to trace how many docs are removed
275            if (response.getFailedShards() > 0) {
276                log.error(String.format("Delete byQuery fails on shard: %d out of %d", response.getFailedShards(),
277                        response.getTotalShards()));
278            }
279        }
280    }
281
282    /**
283     * Return the ecm:path of an ES document or null if not found.
284     */
285    String getPathOfDocFromEs(String repository, String docId) {
286        String indexName = esa.getIndexNameForRepository(repository);
287        GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD);
288        if (log.isDebugEnabled()) {
289            log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'",
290                    indexName, DOC_TYPE, docId, PATH_FIELD));
291        }
292        GetResponse ret = getRequest.execute().actionGet();
293        if (!ret.isExists() || ret.getField(PATH_FIELD) == null) {
294            // doc not found
295            return null;
296        }
297        return ret.getField(PATH_FIELD).getValue().toString();
298    }
299
300    /**
301     * Return indexing request or null if the doc does not exists anymore.
302     *
303     * @throws java.lang.IllegalStateException if the command is not attached to a session
304     */
305    IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) {
306        DocumentModel doc = cmd.getTargetDocument();
307        if (doc == null) {
308            return null;
309        }
310        try {
311            JsonFactory factory = new JsonFactory();
312            XContentBuilder builder = jsonBuilder();
313            JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream());
314            jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null);
315            return esa.getClient().prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE,
316                    cmd.getTargetDocumentId()).setSource(builder);
317        } catch (IOException e) {
318            throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e);
319        }
320    }
321
322}