001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.Arrays;
024import java.util.HashMap;
025import java.util.Map;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.apache.http.HttpStatus;
030import org.apache.http.util.EntityUtils;
031import org.elasticsearch.ElasticsearchStatusException;
032import org.elasticsearch.action.bulk.BulkProcessor;
033import org.elasticsearch.action.bulk.BulkRequest;
034import org.elasticsearch.action.bulk.BulkResponse;
035import org.elasticsearch.action.delete.DeleteRequest;
036import org.elasticsearch.action.delete.DeleteResponse;
037import org.elasticsearch.action.get.GetRequest;
038import org.elasticsearch.action.get.GetResponse;
039import org.elasticsearch.action.index.IndexRequest;
040import org.elasticsearch.action.index.IndexResponse;
041import org.elasticsearch.action.search.ClearScrollRequest;
042import org.elasticsearch.action.search.ClearScrollResponse;
043import org.elasticsearch.action.search.SearchRequest;
044import org.elasticsearch.action.search.SearchResponse;
045import org.elasticsearch.action.search.SearchScrollRequest;
046import org.elasticsearch.client.Request;
047import org.elasticsearch.client.RequestOptions;
048import org.elasticsearch.client.Response;
049import org.elasticsearch.client.RestClient;
050import org.elasticsearch.client.RestHighLevelClient;
051import org.elasticsearch.cluster.health.ClusterHealthStatus;
052import org.elasticsearch.common.xcontent.XContentHelper;
053import org.elasticsearch.common.xcontent.XContentType;
054import org.elasticsearch.rest.RestStatus;
055import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
056import org.nuxeo.ecm.core.api.NuxeoException;
057import org.nuxeo.elasticsearch.api.ESClient;
058
059import io.opencensus.common.Scope;
060import io.opencensus.trace.AttributeValue;
061import io.opencensus.trace.Span;
062import io.opencensus.trace.Tracing;
063
064/**
065 * @since 9.3
066 */
067public class ESRestClient implements ESClient {
068    // TODO: add security sanitizer to make sure all parameters used to build requests are clean
069    private static final Log log = LogFactory.getLog(ESRestClient.class);
070
071    public static final String CREATE_INDEX_TIMEOUT = "60s";
072
073    protected RestClient lowLevelClient;
074
075    protected RestHighLevelClient client;
076
077    public ESRestClient(RestClient lowLevelRestClient, RestHighLevelClient client) {
078        this.lowLevelClient = lowLevelRestClient;
079        this.client = client;
080    }
081
082    @Override
083    public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) {
084        ClusterHealthStatus healthStatus;
085        Response response;
086        try {
087            response = performRequestWithTracing(
088                    new Request("GET", String.format("/_cluster/health/%s?wait_for_status=yellow&timeout=%ds",
089                            getIndexesAsString(indexNames), timeoutSecond)));
090            try (InputStream is = response.getEntity().getContent()) {
091                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
092                healthStatus = ClusterHealthStatus.fromString((String) map.get("status"));
093            }
094        } catch (IOException e) {
095            throw new NuxeoException(e);
096        }
097        switch (healthStatus) {
098        case GREEN:
099            log.info("Elasticsearch Cluster ready: " + response);
100            return true;
101        case YELLOW:
102            log.warn("Elasticsearch Cluster ready but not GREEN: " + response);
103            return false;
104        default:
105            String error = "Elasticsearch Cluster health status: " + healthStatus + ", not Yellow after "
106                    + timeoutSecond + " give up: " + response;
107            throw new IllegalStateException(error);
108        }
109    }
110
111    protected String getIndexesAsString(String[] indexNames) {
112        return indexNames == null ? "" : String.join(",", indexNames);
113    }
114
115    @Override
116    public ClusterHealthStatus getHealthStatus(String[] indexNames) {
117        Response response = performRequestWithTracing(
118                new Request("GET", String.format("/_cluster/health/%s", getIndexesAsString(indexNames))));
119        try (InputStream is = response.getEntity().getContent()) {
120                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
121                return ClusterHealthStatus.fromString((String) map.get("status"));
122        } catch (IOException e) {
123            throw new NuxeoException(e);
124        }
125    }
126
127    @Override
128    public void refresh(String indexName) {
129        performRequestWithTracing(new Request("POST", "/" + indexName + "/_refresh"));
130    }
131
132    @Override
133    public void flush(String indexName) {
134        performRequestWithTracing(new Request("POST", "/" + indexName + "/_flush?wait_if_ongoing=true"));
135    }
136
137    @Override
138    public void optimize(String indexName) {
139        performRequestWithTracing(new Request("POST", "/" + indexName + "/_forcemerge?max_num_segments=1"));
140    }
141
142    @Override
143    public boolean indexExists(String indexName) {
144        Response response = performRequestWithTracing(new Request("HEAD", "/" + indexName));
145        switch (response.getStatusLine().getStatusCode()) {
146        case HttpStatus.SC_OK:
147            return true;
148        case HttpStatus.SC_NOT_FOUND:
149            return false;
150        default:
151            throw new IllegalStateException(String.format("Checking index %s returns: %s", indexName, response));
152        }
153    }
154
155    @Override
156    public boolean mappingExists(String indexName, String type) {
157        // HEAD is not supported anymore since elastic 7.x
158        Response response = performRequestWithTracing(
159                new Request("GET", String.format("/%s/_mapping", indexName)));
160        switch (response.getStatusLine().getStatusCode()) {
161        case HttpStatus.SC_OK:
162            return true;
163        case HttpStatus.SC_NOT_FOUND:
164            return false;
165        default:
166            throw new IllegalStateException(String.format("Checking mapping %s returns: %s", indexName, response));
167        }
168    }
169
170    @Override
171    public void deleteIndex(String indexName, int timeoutSecond) {
172        Response response;
173        try {
174            response = lowLevelClient.performRequest(
175                    new Request("DELETE", String.format("/%s?master_timeout=%ds", indexName, timeoutSecond)));
176        } catch (IOException e) {
177            if (e.getMessage() != null && e.getMessage().contains("illegal_argument_exception")) {
178                // when trying to delete an alias, throws the same exception than the transport client
179                throw new IllegalArgumentException(e);
180            }
181            throw new NuxeoException(e);
182        }
183        int code = response.getStatusLine().getStatusCode();
184        if (code != HttpStatus.SC_OK) {
185            throw new IllegalStateException(String.format("Deleting %s returns: %s", indexName, response));
186        }
187    }
188
189    @Override
190    public void createIndex(String indexName, String jsonSettings) {
191        Request request = new Request("PUT", "/" + indexName + "?timeout=" + CREATE_INDEX_TIMEOUT);
192        // since elastic 7 REST API needs an additional level
193        request.setJsonEntity("{\"settings\": " + jsonSettings + "}");
194        Response response = performRequestWithTracing(request);
195        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
196            throw new NuxeoException("Fail to create index: " + indexName + " :" + response);
197        }
198    }
199
200    @Override
201    public void createMapping(String indexName, String type, String jsonMapping) {
202        Request request = new Request("PUT", String.format("/%s/_mapping", indexName));
203        request.setJsonEntity(jsonMapping);
204        Response response = performRequestWithTracing(request);
205        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
206            throw new NuxeoException(String.format("Fail to create mapping on %s: %s", indexName, response));
207        }
208    }
209
210    protected Response performRequest(Request request) {
211        try {
212            return lowLevelClient.performRequest(request);
213        } catch (IOException e) {
214            throw new NuxeoException(e);
215        }
216    }
217
218    protected Response performRequestWithTracing(Request request) {
219        try (Scope ignored = getScopedSpan("elastic" + request.getEndpoint(), request.toString())) {
220            return performRequest(request);
221        }
222    }
223
224    @Override
225    public String getNodesInfo() {
226        Response response = performRequestWithTracing(new Request("GET", "/_nodes/_all"));
227        try {
228            return EntityUtils.toString(response.getEntity());
229        } catch (IOException e) {
230            throw new NuxeoException(e);
231        }
232    }
233
234    @Override
235    public String getNodesStats() {
236        Response response = performRequestWithTracing(new Request("GET", "/_nodes/stats"));
237        try {
238            return EntityUtils.toString(response.getEntity());
239        } catch (IOException e) {
240            throw new NuxeoException(e);
241        }
242    }
243
244    @Override
245    public boolean aliasExists(String aliasName) {
246        Response response = performRequestWithTracing(new Request("HEAD", String.format("/_alias/%s", aliasName)));
247        switch (response.getStatusLine().getStatusCode()) {
248        case HttpStatus.SC_OK:
249            return true;
250        case HttpStatus.SC_NOT_FOUND:
251            return false;
252        default:
253            throw new IllegalStateException(String.format("Checking alias %s returns: %s", aliasName, response));
254        }
255    }
256
257    @Override
258    public String getFirstIndexForAlias(String aliasName) {
259        if (!aliasExists(aliasName)) {
260            return null;
261        }
262        Response response = performRequestWithTracing(new Request("GET", String.format("/_alias/%s", aliasName)));
263        try (InputStream is = response.getEntity().getContent()) {
264            Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
265            if (map.size() != 1) {
266                throw new NuxeoException(String.format(
267                        "Expecting alias that point to a single index, alias: %s, got: %s", aliasName, response));
268            }
269            return map.keySet().iterator().next();
270        } catch (IOException e) {
271            throw new NuxeoException(e);
272        }
273    }
274
275    @Override
276    public void updateAlias(String aliasName, String indexName) {
277        // TODO do this in a single call to make it atomically
278        if (aliasExists(aliasName)) {
279            deleteAlias(aliasName);
280        }
281        if (indexExists(aliasName)) {
282            throw new NuxeoException("Can create an alias because an index with the same name exists: " + aliasName);
283        }
284        createAlias(aliasName, indexName);
285    }
286
287    protected void deleteAlias(String aliasName) {
288        Response response = performRequestWithTracing(
289                new Request("DELETE", String.format("/_all/_alias/%s", aliasName)));
290        int code = response.getStatusLine().getStatusCode();
291        if (code != HttpStatus.SC_OK) {
292            throw new IllegalStateException(String.format("Deleting %s alias: %s", aliasName, response));
293        }
294    }
295
296    protected void createAlias(String aliasName, String indexName) {
297        Response response = performRequestWithTracing(
298                new Request("PUT", String.format("/%s/_alias/%s", indexName, aliasName)));
299        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
300            throw new NuxeoException("Fail to create alias: " + indexName + " :" + response);
301        }
302    }
303
304    @Override
305    public BulkResponse bulk(BulkRequest request) {
306        try (Scope ignored = getScopedSpan("elastic/_bulk", "actions: " + request.numberOfActions())) {
307            return client.bulk(request, RequestOptions.DEFAULT);
308        } catch (IOException e) {
309            throw new NuxeoException(e);
310        }
311    }
312
313    @Override
314    public DeleteResponse delete(DeleteRequest request) {
315        try {
316            return client.delete(request, RequestOptions.DEFAULT);
317        } catch (IOException e) {
318            throw new NuxeoException(e);
319        }
320    }
321
322    @Override
323    public SearchResponse search(SearchRequest request) {
324        try (Scope ignored = getScopedSpan("elastic/_search", request.toString())) {
325            return client.search(request, RequestOptions.DEFAULT);
326        } catch (IOException | ElasticsearchStatusException e) {
327            // ElasticsearchStatusException is raised when using phrase prefix on keyword type
328            throw new NuxeoException(e);
329        }
330    }
331
332    @Override
333    public SearchResponse searchScroll(SearchScrollRequest request) {
334        try (Scope ignored = getScopedSpan("elastic/_scroll", request.toString())) {
335            return client.scroll(request, RequestOptions.DEFAULT);
336        } catch (IOException e) {
337            throw new NuxeoException(e);
338        }
339    }
340
341    @Override
342    public GetResponse get(GetRequest request) {
343        try (Scope ignored = getScopedSpan("elastic/_get", request.toString())) {
344            return client.get(request, RequestOptions.DEFAULT);
345        } catch (IOException e) {
346            throw new NuxeoException(e);
347        }
348    }
349
350    @Override
351    public IndexResponse index(IndexRequest request) {
352        try (Scope ignored = getScopedSpan("elastic/_index", request.toString())) {
353            return client.index(request, RequestOptions.DEFAULT);
354        } catch (ElasticsearchStatusException e) {
355            if (RestStatus.CONFLICT.equals(e.status())) {
356                throw new ConcurrentUpdateException(e);
357            }
358            throw new NuxeoException(e);
359        } catch (IOException e) {
360            throw new NuxeoException(e);
361        }
362    }
363
364    protected Scope getScopedSpan(String name, String request) {
365        Scope scope = Tracing.getTracer().spanBuilder(name).setSpanKind(Span.Kind.CLIENT).startScopedSpan();
366        Map<String, AttributeValue> map = new HashMap<>();
367        map.put("thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName()));
368        map.put("request", AttributeValue.stringAttributeValue(request));
369        Tracing.getTracer().getCurrentSpan().putAttributes(map);
370        return scope;
371    }
372
373    @Override
374    public ClearScrollResponse clearScroll(ClearScrollRequest request) {
375        try {
376            if (log.isDebugEnabled()) {
377                log.debug(String.format("Clearing scroll ids: %s", Arrays.toString(request.getScrollIds().toArray())));
378            }
379            return client.clearScroll(request, RequestOptions.DEFAULT);
380        } catch (ElasticsearchStatusException e) {
381            if (RestStatus.NOT_FOUND.equals(e.status())) {
382                if (log.isDebugEnabled()) {
383                    log.debug(String.format("Scroll ids not found, they have certainly been already closed: %s",
384                            Arrays.toString(request.getScrollIds().toArray())));
385                }
386                return new ClearScrollResponse(true, 0);
387            }
388            throw new NuxeoException(e);
389        } catch (IOException e) {
390            throw new NuxeoException(e);
391        }
392    }
393
394    @Override
395    public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener bulkListener) {
396        return BulkProcessor.builder((request, listener) -> client.bulkAsync(request, RequestOptions.DEFAULT, listener),
397                bulkListener);
398    }
399
400    @Override
401    public void close() {
402        if (lowLevelClient != null) {
403            try {
404                lowLevelClient.close();
405            } catch (IOException e) {
406                log.warn("Fail to close the Elasticsearch low level RestClient: " + e.getMessage(), e);
407            }
408            lowLevelClient = null;
409        }
410        client = null;
411    }
412}