001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Tiry
016 *     bdelbosc
017 */
018
019package org.nuxeo.elasticsearch.core;
020
021import com.codahale.metrics.MetricRegistry;
022import com.codahale.metrics.SharedMetricRegistries;
023import com.codahale.metrics.Timer;
024import com.codahale.metrics.Timer.Context;
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.codehaus.jackson.JsonFactory;
028import org.codehaus.jackson.JsonGenerator;
029import org.elasticsearch.action.bulk.BulkRequestBuilder;
030import org.elasticsearch.action.bulk.BulkResponse;
031import org.elasticsearch.action.delete.DeleteRequestBuilder;
032import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
033import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
034import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
035import org.elasticsearch.action.get.GetRequestBuilder;
036import org.elasticsearch.action.get.GetResponse;
037import org.elasticsearch.action.index.IndexRequestBuilder;
038import org.elasticsearch.common.xcontent.XContentBuilder;
039import org.elasticsearch.index.query.FilterBuilders;
040import org.elasticsearch.index.query.QueryBuilder;
041import org.elasticsearch.index.query.QueryBuilders;
042import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
043import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
044import org.nuxeo.ecm.core.api.DocumentModel;
045import org.nuxeo.ecm.core.api.DocumentNotFoundException;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
048import org.nuxeo.elasticsearch.commands.IndexingCommand;
049import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
050import org.nuxeo.runtime.api.Framework;
051import org.nuxeo.runtime.metrics.MetricsService;
052
053import java.io.IOException;
054import java.util.List;
055
056import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
057import static org.nuxeo.elasticsearch.ElasticSearchConstants.CHILDREN_FIELD;
058import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
059import static org.nuxeo.elasticsearch.ElasticSearchConstants.PATH_FIELD;
060
061/**
062 * @since 6.0
063 */
064public class ElasticSearchIndexingImpl implements ElasticSearchIndexing {
065    private static final Log log = LogFactory.getLog(ElasticSearchIndexingImpl.class);
066
067    private final ElasticSearchAdminImpl esa;
068
069    private final Timer deleteTimer;
070
071    private final Timer indexTimer;
072
073    private final Timer bulkIndexTimer;
074
075    private JsonESDocumentWriter jsonESDocumentWriter;
076
077    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa) {
078        this.esa = esa;
079        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
080        indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
081        deleteTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "delete"));
082        bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
083        this.jsonESDocumentWriter = new JsonESDocumentWriter();// default writer
084    }
085
086    /**
087     * @since 7.2
088     */
089    public ElasticSearchIndexingImpl(ElasticSearchAdminImpl esa, JsonESDocumentWriter jsonESDocumentWriter) {
090        this(esa);
091        this.jsonESDocumentWriter = jsonESDocumentWriter;
092    }
093
094    @Override
095    public void runIndexingWorker(List<IndexingCommand> cmds) {
096        throw new UnsupportedOperationException("Not implemented");
097    }
098
099    @Override
100    public void runReindexingWorker(String repositoryName, String nxql) {
101        throw new UnsupportedOperationException("Not implemented");
102    }
103
104    @Override
105    public void indexNonRecursive(List<IndexingCommand> cmds) {
106        int nbCommands = cmds.size();
107        if (nbCommands == 1) {
108            indexNonRecursive(cmds.get(0));
109            return;
110        }
111        // simulate long indexing
112        // try {Thread.sleep(1000);} catch (InterruptedException e) { }
113
114        processBulkDeleteCommands(cmds);
115        Context stopWatch = bulkIndexTimer.time();
116        try {
117            processBulkIndexCommands(cmds);
118        } finally {
119            stopWatch.stop();
120        }
121        esa.totalCommandProcessed.addAndGet(nbCommands);
122        refreshIfNeeded(cmds);
123    }
124
125    void processBulkDeleteCommands(List<IndexingCommand> cmds) {
126        // Can be optimized with a single delete by query
127        for (IndexingCommand cmd : cmds) {
128            if (cmd.getType() == Type.DELETE) {
129                Context stopWatch = deleteTimer.time();
130                try {
131                    processDeleteCommand(cmd);
132                } finally {
133                    stopWatch.stop();
134                }
135            }
136        }
137    }
138
139    void processBulkIndexCommands(List<IndexingCommand> cmds) {
140        BulkRequestBuilder bulkRequest = esa.getClient().prepareBulk();
141        for (IndexingCommand cmd : cmds) {
142            if (cmd.getType() == Type.DELETE) {
143                continue;
144            }
145            try {
146                IndexRequestBuilder idxRequest = buildEsIndexingRequest(cmd);
147                if (idxRequest != null) {
148                    bulkRequest.add(idxRequest);
149                }
150            } catch (ConcurrentUpdateException e) {
151                throw e; // bubble up, usually until AbstractWork catches it and maybe retries
152            } catch (DocumentNotFoundException e) {
153                log.info("Skip indexing command to bulk, doc does not exists anymore: " + cmd);
154            } catch (IllegalArgumentException e) {
155                log.error("Skip indexing command to bulk, fail to create request: " + cmd, e);
156            }
157        }
158        if (bulkRequest.numberOfActions() > 0) {
159            if (log.isDebugEnabled()) {
160                log.debug(String.format(
161                        "Index %d docs in bulk request: curl -XPOST 'http://localhost:9200/_bulk' -d '%s'",
162                        bulkRequest.numberOfActions(), bulkRequest.request().requests().toString()));
163            }
164            BulkResponse response = bulkRequest.execute().actionGet();
165            if (response.hasFailures()) {
166                log.error(response.buildFailureMessage());
167            }
168        }
169    }
170
171    protected void refreshIfNeeded(List<IndexingCommand> cmds) {
172        for (IndexingCommand cmd : cmds) {
173            if (refreshIfNeeded(cmd))
174                return;
175        }
176    }
177
178    private boolean refreshIfNeeded(IndexingCommand cmd) {
179        if (cmd.isSync()) {
180            esa.refresh();
181            return true;
182        }
183        return false;
184    }
185
186    @Override
187    public void indexNonRecursive(IndexingCommand cmd) {
188        Context stopWatch = null;
189        try {
190            if (cmd.getType() == Type.DELETE) {
191                stopWatch = deleteTimer.time();
192                processDeleteCommand(cmd);
193            } else {
194                stopWatch = indexTimer.time();
195                processIndexCommand(cmd);
196            }
197            refreshIfNeeded(cmd);
198        } finally {
199            if (stopWatch != null) {
200                stopWatch.stop();
201            }
202            esa.totalCommandProcessed.incrementAndGet();
203        }
204    }
205
206    void processIndexCommand(IndexingCommand cmd) {
207        IndexRequestBuilder request;
208        try {
209            request = buildEsIndexingRequest(cmd);
210        } catch (DocumentNotFoundException e) {
211            request = null;
212        } catch (IllegalStateException e) {
213            log.error("Fail to create request for indexing command: " + cmd, e);
214            return;
215        }
216        if (request == null) {
217            log.info("Cancel indexing command because target document does not exists anymore: " + cmd);
218            return;
219        }
220        if (log.isDebugEnabled()) {
221            log.debug(String.format("Index request: curl -XPUT 'http://localhost:9200/%s/%s/%s' -d '%s'",
222                    esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE, cmd.getTargetDocumentId(),
223                    request.request().toString()));
224        }
225        request.execute().actionGet();
226    }
227
228    void processDeleteCommand(IndexingCommand cmd) {
229        if (cmd.isRecurse()) {
230            processDeleteCommandRecursive(cmd);
231        } else {
232            processDeleteCommandNonRecursive(cmd);
233        }
234    }
235
236    void processDeleteCommandNonRecursive(IndexingCommand cmd) {
237        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
238        DeleteRequestBuilder request = esa.getClient().prepareDelete(indexName, DOC_TYPE, cmd.getTargetDocumentId());
239        if (log.isDebugEnabled()) {
240            log.debug(String.format("Delete request: curl -XDELETE 'http://localhost:9200/%s/%s/%s'", indexName,
241                    DOC_TYPE, cmd.getTargetDocumentId()));
242        }
243        request.execute().actionGet();
244    }
245
246    void processDeleteCommandRecursive(IndexingCommand cmd) {
247        String indexName = esa.getIndexNameForRepository(cmd.getRepositoryName());
248        // we don't want to rely on target document because the document can be
249        // already removed
250        String docPath = getPathOfDocFromEs(cmd.getRepositoryName(), cmd.getTargetDocumentId());
251        if (docPath == null) {
252            if (!Framework.isTestModeSet()) {
253                log.warn("Trying to delete a non existing doc: " + cmd.toString());
254            }
255            return;
256        }
257        QueryBuilder query = QueryBuilders.constantScoreQuery(FilterBuilders.termFilter(CHILDREN_FIELD, docPath));
258        DeleteByQueryRequestBuilder deleteRequest = esa.getClient().prepareDeleteByQuery(indexName).setTypes(DOC_TYPE).setQuery(
259                query);
260        if (log.isDebugEnabled()) {
261            log.debug(String.format(
262                    "Delete byQuery request: curl -XDELETE 'http://localhost:9200/%s/%s/_query' -d '%s'", indexName,
263                    DOC_TYPE, query.toString()));
264        }
265        DeleteByQueryResponse responses = deleteRequest.execute().actionGet();
266        for (IndexDeleteByQueryResponse response : responses) {
267            // there is no way to trace how many docs are removed
268            if (response.getFailedShards() > 0) {
269                log.error(String.format("Delete byQuery fails on shard: %d out of %d", response.getFailedShards(),
270                        response.getTotalShards()));
271            }
272        }
273    }
274
275    /**
276     * Return the ecm:path of an ES document or null if not found.
277     */
278    String getPathOfDocFromEs(String repository, String docId) {
279        String indexName = esa.getIndexNameForRepository(repository);
280        GetRequestBuilder getRequest = esa.getClient().prepareGet(indexName, DOC_TYPE, docId).setFields(PATH_FIELD);
281        if (log.isDebugEnabled()) {
282            log.debug(String.format("Get path of doc: curl -XGET 'http://localhost:9200/%s/%s/%s?fields=%s'",
283                    indexName, DOC_TYPE, docId, PATH_FIELD));
284        }
285        GetResponse ret = getRequest.execute().actionGet();
286        if (!ret.isExists() || ret.getField(PATH_FIELD) == null) {
287            // doc not found
288            return null;
289        }
290        return ret.getField(PATH_FIELD).getValue().toString();
291    }
292
293    /**
294     * Return indexing request or null if the doc does not exists anymore.
295     *
296     * @throws java.lang.IllegalStateException if the command is not attached to a session
297     */
298    IndexRequestBuilder buildEsIndexingRequest(IndexingCommand cmd) {
299        DocumentModel doc = cmd.getTargetDocument();
300        if (doc == null) {
301            return null;
302        }
303        try {
304            JsonFactory factory = new JsonFactory();
305            XContentBuilder builder = jsonBuilder();
306            JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream());
307            jsonESDocumentWriter.writeESDocument(jsonGen, doc, cmd.getSchemas(), null);
308            return esa.getClient().prepareIndex(esa.getIndexNameForRepository(cmd.getRepositoryName()), DOC_TYPE,
309                    cmd.getTargetDocumentId()).setSource(builder);
310        } catch (IOException e) {
311            throw new NuxeoException("Unable to create index request for Document " + cmd.getTargetDocumentId(), e);
312        }
313    }
314
315}