001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.Arrays;
024import java.util.Map;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.http.HttpStatus;
029import org.apache.http.util.EntityUtils;
030import org.elasticsearch.ElasticsearchStatusException;
031import org.elasticsearch.action.bulk.BulkProcessor;
032import org.elasticsearch.action.bulk.BulkRequest;
033import org.elasticsearch.action.bulk.BulkResponse;
034import org.elasticsearch.action.delete.DeleteRequest;
035import org.elasticsearch.action.delete.DeleteResponse;
036import org.elasticsearch.action.get.GetRequest;
037import org.elasticsearch.action.get.GetResponse;
038import org.elasticsearch.action.index.IndexRequest;
039import org.elasticsearch.action.index.IndexResponse;
040import org.elasticsearch.action.search.ClearScrollRequest;
041import org.elasticsearch.action.search.ClearScrollResponse;
042import org.elasticsearch.action.search.SearchRequest;
043import org.elasticsearch.action.search.SearchResponse;
044import org.elasticsearch.action.search.SearchScrollRequest;
045import org.elasticsearch.client.Request;
046import org.elasticsearch.client.RequestOptions;
047import org.elasticsearch.client.Response;
048import org.elasticsearch.client.RestClient;
049import org.elasticsearch.client.RestHighLevelClient;
050import org.elasticsearch.cluster.health.ClusterHealthStatus;
051import org.elasticsearch.common.xcontent.XContentHelper;
052import org.elasticsearch.common.xcontent.XContentType;
053import org.elasticsearch.rest.RestStatus;
054import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
055import org.nuxeo.ecm.core.api.NuxeoException;
056import org.nuxeo.elasticsearch.api.ESClient;
057
058/**
059 * @since 9.3
060 */
061public class ESRestClient implements ESClient {
062    // TODO: add security sanitizer to make sure all parameters used to build requests are clean
063    private static final Log log = LogFactory.getLog(ESRestClient.class);
064
065    public static final String CREATE_INDEX_TIMEOUT = "60s";
066
067    protected RestClient lowLevelClient;
068
069    protected RestHighLevelClient client;
070
071    public ESRestClient(RestClient lowLevelRestClient, RestHighLevelClient client) {
072        this.lowLevelClient = lowLevelRestClient;
073        this.client = client;
074    }
075
076    @Override
077    public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) {
078        ClusterHealthStatus healthStatus;
079        Response response;
080        try {
081            response = performRequest(
082                    new Request("GET", String.format("/_cluster/health/%s?wait_for_status=yellow&timeout=%ds",
083                            getIndexesAsString(indexNames), timeoutSecond)));
084            try (InputStream is = response.getEntity().getContent()) {
085                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
086                healthStatus = ClusterHealthStatus.fromString((String) map.get("status"));
087            }
088        } catch (IOException e) {
089            throw new NuxeoException(e);
090        }
091        switch (healthStatus) {
092        case GREEN:
093            log.info("Elasticsearch Cluster ready: " + response);
094            return true;
095        case YELLOW:
096            log.warn("Elasticsearch Cluster ready but not GREEN: " + response);
097            return false;
098        default:
099            String error = "Elasticsearch Cluster health status: " + healthStatus + ", not Yellow after "
100                    + timeoutSecond + " give up: " + response;
101            throw new IllegalStateException(error);
102        }
103    }
104
105    protected String getIndexesAsString(String[] indexNames) {
106        return indexNames == null ? "" : String.join(",", indexNames);
107    }
108
109    @Override
110    public ClusterHealthStatus getHealthStatus(String[] indexNames) {
111        Response response = performRequest(
112                new Request("GET", String.format("/_cluster/health/%s", getIndexesAsString(indexNames))));
113        try (InputStream is = response.getEntity().getContent()) {
114                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
115                return ClusterHealthStatus.fromString((String) map.get("status"));
116        } catch (IOException e) {
117            throw new NuxeoException(e);
118        }
119    }
120
121    @Override
122    public void refresh(String indexName) {
123        performRequest(new Request("POST", "/" + indexName + "/_refresh"));
124    }
125
126    @Override
127    public void flush(String indexName) {
128        performRequest(new Request("POST", "/" + indexName + "/_flush?wait_if_ongoing=true"));
129    }
130
131    @Override
132    public void optimize(String indexName) {
133        performRequest(new Request("POST", "/" + indexName + "/_forcemerge?max_num_segments=1"));
134    }
135
136    @Override
137    public boolean indexExists(String indexName) {
138        Response response = performRequest(new Request("HEAD", "/" + indexName));
139        switch (response.getStatusLine().getStatusCode()) {
140        case HttpStatus.SC_OK:
141            return true;
142        case HttpStatus.SC_NOT_FOUND:
143            return false;
144        default:
145            throw new IllegalStateException(String.format("Checking index %s returns: %s", indexName, response));
146        }
147    }
148
149    @Override
150    public boolean mappingExists(String indexName, String type) {
151        Response response = performRequest(new Request("HEAD", String.format("/%s/_mapping/%s", indexName, type)));
152        switch (response.getStatusLine().getStatusCode()) {
153        case HttpStatus.SC_OK:
154            return true;
155        case HttpStatus.SC_NOT_FOUND:
156            return false;
157        default:
158            throw new IllegalStateException(String.format("Checking mapping %s returns: %s", indexName, response));
159        }
160    }
161
162    @Override
163    public void deleteIndex(String indexName, int timeoutSecond) {
164        Response response;
165        try {
166            response = lowLevelClient.performRequest(
167                    new Request("DELETE", String.format("/%s?master_timeout=%ds", indexName, timeoutSecond)));
168        } catch (IOException e) {
169            if (e.getMessage() != null && e.getMessage().contains("illegal_argument_exception")) {
170                // when trying to delete an alias, throws the same exception than the transport client
171                throw new IllegalArgumentException(e);
172            }
173            throw new NuxeoException(e);
174        }
175        int code = response.getStatusLine().getStatusCode();
176        if (code != HttpStatus.SC_OK) {
177            throw new IllegalStateException(String.format("Deleting %s returns: %s", indexName, response));
178        }
179    }
180
181    @Override
182    public void createIndex(String indexName, String jsonSettings) {
183        Request request = new Request("PUT", "/" + indexName + "?timeout=" + CREATE_INDEX_TIMEOUT);
184        request.setJsonEntity(jsonSettings);
185        Response response = performRequest(request);
186        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
187            throw new NuxeoException("Fail to create index: " + indexName + " :" + response);
188        }
189    }
190
191    @Override
192    public void createMapping(String indexName, String type, String jsonMapping) {
193        Request request = new Request("PUT", String.format("/%s/%s/_mapping", indexName, type));
194        request.setJsonEntity(jsonMapping);
195        Response response = performRequest(request);
196        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
197            throw new NuxeoException(String.format("Fail to create mapping on %s/%s: %s", indexName, type, response));
198        }
199    }
200
201    protected Response performRequest(Request request) {
202        try {
203            return lowLevelClient.performRequest(request);
204        } catch (IOException e) {
205            throw new NuxeoException(e);
206        }
207    }
208
209    @Override
210    public String getNodesInfo() {
211        Response response = performRequest(new Request("GET", "/_nodes/_all"));
212        try {
213            return EntityUtils.toString(response.getEntity());
214        } catch (IOException e) {
215            throw new NuxeoException(e);
216        }
217    }
218
219    @Override
220    public String getNodesStats() {
221        Response response = performRequest(new Request("GET", "/_nodes/stats"));
222        try {
223            return EntityUtils.toString(response.getEntity());
224        } catch (IOException e) {
225            throw new NuxeoException(e);
226        }
227    }
228
229    @Override
230    public boolean aliasExists(String aliasName) {
231        Response response = performRequest(new Request("HEAD", String.format("/_alias/%s", aliasName)));
232        switch (response.getStatusLine().getStatusCode()) {
233        case HttpStatus.SC_OK:
234            return true;
235        case HttpStatus.SC_NOT_FOUND:
236            return false;
237        default:
238            throw new IllegalStateException(String.format("Checking alias %s returns: %s", aliasName, response));
239        }
240    }
241
242    @Override
243    public String getFirstIndexForAlias(String aliasName) {
244        if (!aliasExists(aliasName)) {
245            return null;
246        }
247        Response response = performRequest(new Request("GET", String.format("/_alias/%s", aliasName)));
248        try (InputStream is = response.getEntity().getContent()) {
249            Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
250            if (map.size() != 1) {
251                throw new NuxeoException(String.format(
252                        "Expecting alias that point to a single index, alias: %s, got: %s", aliasName, response));
253            }
254            return map.keySet().iterator().next();
255        } catch (IOException e) {
256            throw new NuxeoException(e);
257        }
258    }
259
260    @Override
261    public void updateAlias(String aliasName, String indexName) {
262        // TODO do this in a single call to make it atomically
263        if (aliasExists(aliasName)) {
264            deleteAlias(aliasName);
265        }
266        if (indexExists(aliasName)) {
267            throw new NuxeoException("Can create an alias because an index with the same name exists: " + aliasName);
268        }
269        createAlias(aliasName, indexName);
270    }
271
272    protected void deleteAlias(String aliasName) {
273        Response response = performRequest(new Request("DELETE", String.format("/_all/_alias/%s", aliasName)));
274        int code = response.getStatusLine().getStatusCode();
275        if (code != HttpStatus.SC_OK) {
276            throw new IllegalStateException(String.format("Deleting %s alias: %s", aliasName, response));
277        }
278    }
279
280    protected void createAlias(String aliasName, String indexName) {
281        Response response = performRequest(new Request("PUT", String.format("/%s/_alias/%s", indexName, aliasName)));
282        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
283            throw new NuxeoException("Fail to create alias: " + indexName + " :" + response);
284        }
285    }
286
287    @Override
288    public BulkResponse bulk(BulkRequest request) {
289        try {
290            return client.bulk(request, RequestOptions.DEFAULT);
291        } catch (IOException e) {
292            throw new NuxeoException(e);
293        }
294    }
295
296    @Override
297    public DeleteResponse delete(DeleteRequest request) {
298        try {
299            return client.delete(request, RequestOptions.DEFAULT);
300        } catch (IOException e) {
301            throw new NuxeoException(e);
302        }
303    }
304
305    @Override
306    public SearchResponse search(SearchRequest request) {
307        try {
308            return client.search(request, RequestOptions.DEFAULT);
309        } catch (IOException e) {
310            throw new NuxeoException(e);
311        }
312    }
313
314    @Override
315    public SearchResponse searchScroll(SearchScrollRequest request) {
316        try {
317            return client.scroll(request, RequestOptions.DEFAULT);
318        } catch (IOException e) {
319            throw new NuxeoException(e);
320        }
321    }
322
323    @Override
324    public GetResponse get(GetRequest request) {
325        try {
326            return client.get(request, RequestOptions.DEFAULT);
327        } catch (IOException e) {
328            throw new NuxeoException(e);
329        }
330    }
331
332    @Override
333    public IndexResponse index(IndexRequest request) {
334        try {
335            return client.index(request, RequestOptions.DEFAULT);
336        } catch (ElasticsearchStatusException e) {
337            if (RestStatus.CONFLICT.equals(e.status())) {
338                throw new ConcurrentUpdateException(e);
339            }
340            throw new NuxeoException(e);
341        } catch (IOException e) {
342            throw new NuxeoException(e);
343        }
344    }
345
346    @Override
347    public ClearScrollResponse clearScroll(ClearScrollRequest request) {
348        try {
349            if (log.isDebugEnabled()) {
350                log.debug(String.format("Clearing scroll ids: %s", Arrays.toString(request.getScrollIds().toArray())));
351            }
352            return client.clearScroll(request, RequestOptions.DEFAULT);
353        } catch (ElasticsearchStatusException e) {
354            if (RestStatus.NOT_FOUND.equals(e.status())) {
355                if (log.isDebugEnabled()) {
356                    log.debug(String.format("Scroll ids not found, they have certainly been already closed: %s",
357                            Arrays.toString(request.getScrollIds().toArray())));
358                }
359                return new ClearScrollResponse(true, 0);
360            }
361            throw new NuxeoException(e);
362        } catch (IOException e) {
363            throw new NuxeoException(e);
364        }
365    }
366
367    @Override
368    public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener bulkListener) {
369        return BulkProcessor.builder((request, listener) -> client.bulkAsync(request, RequestOptions.DEFAULT, listener),
370                bulkListener);
371    }
372
373    @Override
374    public void close() {
375        if (lowLevelClient != null) {
376            try {
377                lowLevelClient.close();
378            } catch (IOException e) {
379                log.warn("Fail to close the Elasticsearch low level RestClient: " + e.getMessage(), e);
380            }
381            lowLevelClient = null;
382        }
383        client = null;
384    }
385}