001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import static java.util.Collections.emptyMap;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.util.Arrays;
026import java.util.Map;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.http.HttpEntity;
031import org.apache.http.HttpStatus;
032import org.apache.http.entity.ContentType;
033import org.apache.http.nio.entity.NStringEntity;
034import org.apache.http.util.EntityUtils;
035import org.elasticsearch.ElasticsearchStatusException;
036import org.elasticsearch.action.bulk.BulkRequest;
037import org.elasticsearch.action.bulk.BulkResponse;
038import org.elasticsearch.action.delete.DeleteRequest;
039import org.elasticsearch.action.delete.DeleteResponse;
040import org.elasticsearch.action.get.GetRequest;
041import org.elasticsearch.action.get.GetResponse;
042import org.elasticsearch.action.index.IndexRequest;
043import org.elasticsearch.action.index.IndexResponse;
044import org.elasticsearch.action.search.ClearScrollRequest;
045import org.elasticsearch.action.search.ClearScrollResponse;
046import org.elasticsearch.action.search.SearchRequest;
047import org.elasticsearch.action.search.SearchResponse;
048import org.elasticsearch.action.search.SearchScrollRequest;
049import org.elasticsearch.client.Response;
050import org.elasticsearch.client.ResponseException;
051import org.elasticsearch.client.RestClient;
052import org.elasticsearch.client.RestHighLevelClient;
053import org.elasticsearch.cluster.health.ClusterHealthStatus;
054import org.elasticsearch.common.xcontent.XContentHelper;
055import org.elasticsearch.common.xcontent.XContentType;
056import org.elasticsearch.index.engine.VersionConflictEngineException;
057import org.elasticsearch.rest.RestStatus;
058import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
059import org.nuxeo.ecm.core.api.NuxeoException;
060import org.nuxeo.elasticsearch.api.ESClient;
061
062/**
063 * @since 9.3
064 */
065public class ESRestClient implements ESClient {
066    // TODO: add security sanitizer to make sure all parameters used to build requests are clean
067    private static final Log log = LogFactory.getLog(ESRestClient.class);
068
069    protected RestClient lowLevelClient;
070
071    protected RestHighLevelClient client;
072
073    public ESRestClient(RestClient lowLevelRestClient, RestHighLevelClient client) {
074        this.lowLevelClient = lowLevelRestClient;
075        this.client = client;
076    }
077
078    @Override
079    public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) {
080        ClusterHealthStatus healthStatus;
081        Response response;
082        try {
083            response = lowLevelClient.performRequest("GET",
084                    String.format("/_cluster/health/%s?wait_for_status=yellow&timeout=%ds",
085                            getIndexesAsString(indexNames), timeoutSecond));
086            try (InputStream is = response.getEntity().getContent()) {
087                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
088                healthStatus = ClusterHealthStatus.fromString((String) map.get("status"));
089            }
090        } catch (IOException e) {
091            throw new NuxeoException(e);
092        }
093        switch (healthStatus) {
094        case GREEN:
095            log.info("Elasticsearch Cluster ready: " + response);
096            return true;
097        case YELLOW:
098            log.warn("Elasticsearch Cluster ready but not GREEN: " + response);
099            return false;
100        default:
101            String error = "Elasticsearch Cluster health status: " + healthStatus + ", not Yellow after "
102                    + timeoutSecond + " give up: " + response;
103            throw new IllegalStateException(error);
104        }
105    }
106
107    protected String getIndexesAsString(String[] indexNames) {
108        return indexNames == null ? "" : String.join(",", indexNames);
109    }
110
111    @Override
112    public ClusterHealthStatus getHealthStatus(String[] indexNames) {
113        try {
114            Response response = lowLevelClient.performRequest("GET",
115                    String.format("/_cluster/health/%s", getIndexesAsString(indexNames)));
116            try (InputStream is = response.getEntity().getContent()) {
117                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
118                return ClusterHealthStatus.fromString((String) map.get("status"));
119            }
120        } catch (IOException e) {
121            throw new NuxeoException(e);
122        }
123    }
124
125    @Override
126    public void refresh(String indexName) {
127        try {
128            lowLevelClient.performRequest("POST", "/" + indexName + "/_refresh");
129        } catch (IOException e) {
130            throw new NuxeoException(e);
131        }
132
133    }
134
135    @Override
136    public void flush(String indexName) {
137        try {
138            lowLevelClient.performRequest("POST", "/" + indexName + "/_flush?wait_if_ongoing=true");
139        } catch (IOException e) {
140            throw new NuxeoException(e);
141        }
142    }
143
144    @Override
145    public void optimize(String indexName) {
146        try {
147            lowLevelClient.performRequest("POST", "/" + indexName + "/_forcemerge?max_num_segments=1");
148        } catch (IOException e) {
149            throw new NuxeoException(e);
150        }
151    }
152
153    @Override
154    public boolean indexExists(String indexName) {
155        Response response;
156        try {
157            response = lowLevelClient.performRequest("HEAD", "/" + indexName);
158        } catch (IOException e) {
159            throw new NuxeoException(e);
160        }
161        int code = response.getStatusLine().getStatusCode();
162        if (code == HttpStatus.SC_OK) {
163            return true;
164        } else if (code == HttpStatus.SC_NOT_FOUND) {
165            return false;
166        }
167        throw new IllegalStateException(String.format("Checking index %s returns: %s", indexName, response));
168    }
169
170    @Override
171    public boolean mappingExists(String indexName, String type) {
172        Response response;
173        try {
174            response = lowLevelClient.performRequest("HEAD", String.format("/%s/_mapping/%s", indexName, type));
175        } catch (IOException e) {
176            throw new NuxeoException(e);
177        }
178        int code = response.getStatusLine().getStatusCode();
179        if (code == HttpStatus.SC_OK) {
180            return true;
181        } else if (code == HttpStatus.SC_NOT_FOUND) {
182            return false;
183        }
184        throw new IllegalStateException(String.format("Checking mapping %s returns: %s", indexName, response));
185    }
186
187    @Override
188    public void deleteIndex(String indexName, int timeoutSecond) {
189        Response response;
190        try {
191            response = lowLevelClient.performRequest("DELETE",
192                    String.format("/%s?master_timeout=%ds", indexName, timeoutSecond));
193        } catch (IOException e) {
194            if (e.getMessage() != null && e.getMessage().contains("illegal_argument_exception")) {
195                // when trying to delete an alias, throws the same exception than the transport client
196                throw new IllegalArgumentException(e);
197            }
198            throw new NuxeoException(e);
199        }
200        int code = response.getStatusLine().getStatusCode();
201        if (code != HttpStatus.SC_OK) {
202            throw new IllegalStateException(String.format("Deleting %s returns: %s", indexName, response));
203        }
204    }
205
206    @Override
207    public void createIndex(String indexName, String jsonSettings) {
208        HttpEntity entity = new NStringEntity(jsonSettings, ContentType.APPLICATION_JSON);
209        Response response;
210        try {
211            response = lowLevelClient.performRequest("PUT", "/" + indexName, emptyMap(), entity);
212        } catch (IOException e) {
213            throw new NuxeoException(e);
214        }
215        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
216            throw new NuxeoException("Fail to create index: " + indexName + " :" + response);
217        }
218    }
219
220    @Override
221    public void createMapping(String indexName, String type, String jsonMapping) {
222        HttpEntity entity = new NStringEntity(jsonMapping, ContentType.APPLICATION_JSON);
223        Response response;
224        try {
225            response = lowLevelClient.performRequest("PUT", String.format("/%s/%s/_mapping", indexName, type),
226                    emptyMap(), entity);
227        } catch (IOException e) {
228            throw new NuxeoException(e);
229        }
230        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
231            throw new NuxeoException(String.format("Fail to create mapping on %s/%s: %s", indexName, type, response));
232        }
233
234    }
235
236    @Override
237    public String getNodesInfo() {
238        try {
239            Response response = lowLevelClient.performRequest("GET", "/_nodes/_all");
240            return EntityUtils.toString(response.getEntity());
241        } catch (IOException e) {
242            throw new NuxeoException(e);
243        }
244    }
245
246    @Override
247    public String getNodesStats() {
248        try {
249            Response response = lowLevelClient.performRequest("GET", "/_nodes/stats");
250            return EntityUtils.toString(response.getEntity());
251        } catch (IOException e) {
252            throw new NuxeoException(e);
253        }
254    }
255
256    @Override
257    public boolean aliasExists(String aliasName) {
258        Response response;
259        try {
260            response = lowLevelClient.performRequest("HEAD", String.format("/_alias/%s", aliasName));
261        } catch (IOException e) {
262            throw new NuxeoException(e);
263        }
264        int code = response.getStatusLine().getStatusCode();
265        if (code == HttpStatus.SC_OK) {
266            return true;
267        } else if (code == HttpStatus.SC_NOT_FOUND) {
268            return false;
269        }
270        throw new IllegalStateException(String.format("Checking alias %s returns: %s", aliasName, response));
271    }
272
273    @Override
274    public String getFirstIndexForAlias(String aliasName) {
275        if (!aliasExists(aliasName)) {
276            return null;
277        }
278        try {
279            Response response = lowLevelClient.performRequest("GET", String.format("/_alias/%s", aliasName));
280            try (InputStream is = response.getEntity().getContent()) {
281                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
282                if (map.size() != 1) {
283                    throw new NuxeoException(String.format(
284                            "Expecting alias that point to a single index, alias: %s, got: %s", aliasName, response));
285                }
286                return map.keySet().iterator().next();
287            }
288        } catch (IOException e) {
289            throw new NuxeoException(e);
290        }
291    }
292
293    @Override
294    public void updateAlias(String aliasName, String indexName) {
295        // TODO do this in a single call to make it atomically
296        if (aliasExists(aliasName)) {
297            deleteAlias(aliasName);
298        }
299        if (indexExists(aliasName)) {
300            throw new NuxeoException("Can create an alias because an index with the same name exists: " + aliasName);
301        }
302        createAlias(aliasName, indexName);
303    }
304
305    protected void deleteAlias(String aliasName) {
306        Response response;
307        try {
308            response = lowLevelClient.performRequest("DELETE", String.format("/_all/_alias/%s", aliasName));
309        } catch (IOException e) {
310            throw new NuxeoException(e);
311        }
312        int code = response.getStatusLine().getStatusCode();
313        if (code != HttpStatus.SC_OK) {
314            throw new IllegalStateException(String.format("Deleting %s alias: %s", aliasName, response));
315        }
316    }
317
318    protected void createAlias(String aliasName, String indexName) {
319        Response response;
320        try {
321            response = lowLevelClient.performRequest("PUT", String.format("/%s/_alias/%s", indexName, aliasName),
322                    emptyMap());
323        } catch (IOException e) {
324            throw new NuxeoException(e);
325        }
326        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
327            throw new NuxeoException("Fail to create alias: " + indexName + " :" + response);
328        }
329    }
330
331    @Override
332    public BulkResponse bulk(BulkRequest request) {
333        try {
334            return client.bulk(request);
335        } catch (IOException e) {
336            throw new NuxeoException(e);
337        }
338    }
339
340    @Override
341    public DeleteResponse delete(DeleteRequest request) {
342        try {
343            return client.delete(request);
344        } catch (IOException e) {
345            throw new NuxeoException(e);
346        }
347    }
348
349    @Override
350    public SearchResponse search(SearchRequest request) {
351        try {
352            return client.search(request);
353        } catch (IOException e) {
354            throw new NuxeoException(e);
355        }
356    }
357
358    @Override
359    public SearchResponse searchScroll(SearchScrollRequest request) {
360        try {
361            return client.searchScroll(request);
362        } catch (IOException e) {
363            throw new NuxeoException(e);
364        }
365    }
366
367    @Override
368    public GetResponse get(GetRequest request) {
369        try {
370            return client.get(request);
371        } catch (IOException e) {
372            throw new NuxeoException(e);
373        }
374    }
375
376    @Override
377    public IndexResponse index(IndexRequest request) {
378        try {
379            return client.index(request);
380        } catch (ElasticsearchStatusException e) {
381            if (RestStatus.CONFLICT.equals(e.status())) {
382                throw new ConcurrentUpdateException(e);
383            }
384            throw new NuxeoException(e);
385        } catch (IOException e) {
386            throw new NuxeoException(e);
387        }
388    }
389
390    @Override
391    public ClearScrollResponse clearScroll(ClearScrollRequest request) {
392        try {
393            if (log.isDebugEnabled()) {
394                log.debug(String.format("Clearing scroll ids: %s",
395                        Arrays.toString(request.getScrollIds().toArray())));
396            }
397            return client.clearScroll(request);
398        } catch (ElasticsearchStatusException e) {
399            if (RestStatus.NOT_FOUND.equals(e.status())) {
400                if (log.isDebugEnabled()) {
401                    log.debug(String.format("Scroll ids not found, they have certainly been already closed",
402                            Arrays.toString(request.getScrollIds().toArray())));
403                }
404                return new ClearScrollResponse(true, 0);
405            }
406            throw new NuxeoException(e);
407        } catch (IOException e) {
408            throw new NuxeoException(e);
409        }
410    }
411
412    @Override
413    public void close() {
414        if (lowLevelClient != null) {
415            try {
416                lowLevelClient.close();
417            } catch (IOException e) {
418                log.warn("Fail to close the Elasticsearch low level RestClient: " + e.getMessage(), e);
419            }
420            lowLevelClient = null;
421        }
422        client = null;
423    }
424}