001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import static java.util.Collections.emptyMap;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.util.Map;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.apache.http.HttpEntity;
030import org.apache.http.HttpStatus;
031import org.apache.http.entity.ContentType;
032import org.apache.http.nio.entity.NStringEntity;
033import org.apache.http.util.EntityUtils;
034import org.elasticsearch.action.bulk.BulkRequest;
035import org.elasticsearch.action.bulk.BulkResponse;
036import org.elasticsearch.action.delete.DeleteRequest;
037import org.elasticsearch.action.delete.DeleteResponse;
038import org.elasticsearch.action.get.GetRequest;
039import org.elasticsearch.action.get.GetResponse;
040import org.elasticsearch.action.index.IndexRequest;
041import org.elasticsearch.action.index.IndexResponse;
042import org.elasticsearch.action.search.ClearScrollRequest;
043import org.elasticsearch.action.search.ClearScrollResponse;
044import org.elasticsearch.action.search.SearchRequest;
045import org.elasticsearch.action.search.SearchResponse;
046import org.elasticsearch.action.search.SearchScrollRequest;
047import org.elasticsearch.client.Response;
048import org.elasticsearch.client.RestClient;
049import org.elasticsearch.client.RestHighLevelClient;
050import org.elasticsearch.cluster.health.ClusterHealthStatus;
051import org.elasticsearch.common.xcontent.XContentHelper;
052import org.elasticsearch.common.xcontent.XContentType;
053import org.nuxeo.ecm.core.api.NuxeoException;
054import org.nuxeo.elasticsearch.api.ESClient;
055
056/**
057 * @since 9.3
058 */
059public class ESRestClient implements ESClient {
060    // TODO: add security sanitizer to make sure all parameters used to build requests are clean
061    private static final Log log = LogFactory.getLog(ESRestClient.class);
062
063    protected RestClient lowLevelClient;
064
065    protected RestHighLevelClient client;
066
067    public ESRestClient(RestClient lowLevelRestClient, RestHighLevelClient client) {
068        this.lowLevelClient = lowLevelRestClient;
069        this.client = client;
070    }
071
072    @Override
073    public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) {
074        ClusterHealthStatus healthStatus;
075        Response response;
076        try {
077            response = lowLevelClient.performRequest("GET",
078                    String.format("/_cluster/health/%s?wait_for_status=yellow&timeout=%ds",
079                            getIndexesAsString(indexNames), timeoutSecond));
080            try (InputStream is = response.getEntity().getContent()) {
081                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
082                healthStatus = ClusterHealthStatus.fromString((String) map.get("status"));
083            }
084        } catch (IOException e) {
085            throw new NuxeoException(e);
086        }
087        switch (healthStatus) {
088        case GREEN:
089            log.info("Elasticsearch Cluster ready: " + response);
090            return true;
091        case YELLOW:
092            log.warn("Elasticsearch Cluster ready but not GREEN: " + response);
093            return false;
094        default:
095            String error = "Elasticsearch Cluster health status: " + healthStatus + ", not Yellow after "
096                    + timeoutSecond + " give up: " + response;
097            throw new IllegalStateException(error);
098        }
099    }
100
101    protected String getIndexesAsString(String[] indexNames) {
102        return indexNames == null ? "" : String.join(",", indexNames);
103    }
104
105    @Override
106    public ClusterHealthStatus getHealthStatus(String[] indexNames) {
107        try {
108            Response response = lowLevelClient.performRequest("GET",
109                    String.format("/_cluster/health/%s", getIndexesAsString(indexNames)));
110            try (InputStream is = response.getEntity().getContent()) {
111                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
112                return ClusterHealthStatus.fromString((String) map.get("status"));
113            }
114        } catch (IOException e) {
115            throw new NuxeoException(e);
116        }
117    }
118
119    @Override
120    public void refresh(String indexName) {
121        try {
122            lowLevelClient.performRequest("POST", "/" + indexName + "/_refresh");
123        } catch (IOException e) {
124            throw new NuxeoException(e);
125        }
126
127    }
128
129    @Override
130    public void flush(String indexName) {
131        try {
132            lowLevelClient.performRequest("POST", "/" + indexName + "/_flush?wait_if_ongoing=true");
133        } catch (IOException e) {
134            throw new NuxeoException(e);
135        }
136    }
137
138    @Override
139    public void optimize(String indexName) {
140        try {
141            lowLevelClient.performRequest("POST", "/" + indexName + "/_forcemerge?max_num_segments=1");
142        } catch (IOException e) {
143            throw new NuxeoException(e);
144        }
145    }
146
147    @Override
148    public boolean indexExists(String indexName) {
149        Response response;
150        try {
151            response = lowLevelClient.performRequest("HEAD", "/" + indexName);
152        } catch (IOException e) {
153            throw new NuxeoException(e);
154        }
155        int code = response.getStatusLine().getStatusCode();
156        if (code == HttpStatus.SC_OK) {
157            return true;
158        } else if (code == HttpStatus.SC_NOT_FOUND) {
159            return false;
160        }
161        throw new IllegalStateException(String.format("Checking index %s returns: %s", indexName, response));
162    }
163
164    @Override
165    public boolean mappingExists(String indexName, String type) {
166        Response response;
167        try {
168            response = lowLevelClient.performRequest("HEAD", String.format("/%s/_mapping/%s", indexName, type));
169        } catch (IOException e) {
170            throw new NuxeoException(e);
171        }
172        int code = response.getStatusLine().getStatusCode();
173        if (code == HttpStatus.SC_OK) {
174            return true;
175        } else if (code == HttpStatus.SC_NOT_FOUND) {
176            return false;
177        }
178        throw new IllegalStateException(String.format("Checking mapping %s returns: %s", indexName, response));
179    }
180
181    @Override
182    public void deleteIndex(String indexName, int timeoutSecond) {
183        Response response;
184        try {
185            response = lowLevelClient.performRequest("DELETE",
186                    String.format("/%s?master_timeout=%ds", indexName, timeoutSecond));
187        } catch (IOException e) {
188            throw new NuxeoException(e);
189        }
190        int code = response.getStatusLine().getStatusCode();
191        if (code != HttpStatus.SC_OK) {
192            throw new IllegalStateException(String.format("Deleting %s returns: %s", indexName, response));
193        }
194    }
195
196    @Override
197    public void createIndex(String indexName, String jsonSettings) {
198        HttpEntity entity = new NStringEntity(jsonSettings, ContentType.APPLICATION_JSON);
199        Response response;
200        try {
201            response = lowLevelClient.performRequest("PUT", "/" + indexName, emptyMap(), entity);
202        } catch (IOException e) {
203            throw new NuxeoException(e);
204        }
205        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
206            throw new NuxeoException("Fail to create index: " + indexName + " :" + response);
207        }
208    }
209
210    @Override
211    public void createMapping(String indexName, String type, String jsonMapping) {
212        HttpEntity entity = new NStringEntity(jsonMapping, ContentType.APPLICATION_JSON);
213        Response response;
214        try {
215            response = lowLevelClient.performRequest("PUT", String.format("/%s/%s/_mapping", indexName, type),
216                    emptyMap(), entity);
217        } catch (IOException e) {
218            throw new NuxeoException(e);
219        }
220        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
221            throw new NuxeoException(String.format("Fail to create mapping on %s/%s: %s", indexName, type, response));
222        }
223
224    }
225
226    @Override
227    public String getNodesInfo() {
228        try {
229            Response response = lowLevelClient.performRequest("GET", "/_nodes/_all");
230            return EntityUtils.toString(response.getEntity());
231        } catch (IOException e) {
232            throw new NuxeoException(e);
233        }
234    }
235
236    @Override
237    public String getNodesStats() {
238        try {
239            Response response = lowLevelClient.performRequest("GET", "/_nodes/stats");
240            return EntityUtils.toString(response.getEntity());
241        } catch (IOException e) {
242            throw new NuxeoException(e);
243        }
244    }
245
246    @Override
247    public boolean aliasExists(String aliasName) {
248        Response response;
249        try {
250            response = lowLevelClient.performRequest("HEAD", String.format("/_alias/%s", aliasName));
251        } catch (IOException e) {
252            throw new NuxeoException(e);
253        }
254        int code = response.getStatusLine().getStatusCode();
255        if (code == HttpStatus.SC_OK) {
256            return true;
257        } else if (code == HttpStatus.SC_NOT_FOUND) {
258            return false;
259        }
260        throw new IllegalStateException(String.format("Checking alias %s returns: %s", aliasName, response));
261    }
262
263    @Override
264    public String getFirstIndexForAlias(String aliasName) {
265        if (!aliasExists(aliasName)) {
266            return null;
267        }
268        try {
269            Response response = lowLevelClient.performRequest("GET", String.format("/_alias/%s", aliasName));
270            try (InputStream is = response.getEntity().getContent()) {
271                Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
272                if (map.size() != 1) {
273                    throw new NuxeoException(String.format(
274                            "Expecting alias that point to a single index, alias: %s, got: %s", aliasName, response));
275                }
276                return map.keySet().iterator().next();
277            }
278        } catch (IOException e) {
279            throw new NuxeoException(e);
280        }
281    }
282
283    @Override
284    public void updateAlias(String aliasName, String indexName) {
285        // TODO do this in a single call to make it atomically
286        if (aliasExists(aliasName)) {
287            deleteAlias(aliasName);
288        }
289        if (indexExists(aliasName)) {
290            throw new NuxeoException("Can create an alias because an index with the same name exists: " + aliasName);
291        }
292        createAlias(aliasName, indexName);
293    }
294
295    protected void deleteAlias(String aliasName) {
296        Response response;
297        try {
298            response = lowLevelClient.performRequest("DELETE", String.format("/_all/_alias/%s", aliasName));
299        } catch (IOException e) {
300            throw new NuxeoException(e);
301        }
302        int code = response.getStatusLine().getStatusCode();
303        if (code != HttpStatus.SC_OK) {
304            throw new IllegalStateException(String.format("Deleting %s alias: %s", aliasName, response));
305        }
306    }
307
308    protected void createAlias(String aliasName, String indexName) {
309        Response response;
310        try {
311            response = lowLevelClient.performRequest("PUT", String.format("/%s/_alias/%s", indexName, aliasName),
312                    emptyMap());
313        } catch (IOException e) {
314            throw new NuxeoException(e);
315        }
316        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
317            throw new NuxeoException("Fail to create alias: " + indexName + " :" + response);
318        }
319    }
320
321    @Override
322    public BulkResponse bulk(BulkRequest request) {
323        try {
324            return client.bulk(request);
325        } catch (IOException e) {
326            throw new NuxeoException(e);
327        }
328    }
329
330    @Override
331    public DeleteResponse delete(DeleteRequest request) {
332        try {
333            return client.delete(request);
334        } catch (IOException e) {
335            throw new NuxeoException(e);
336        }
337    }
338
339    @Override
340    public SearchResponse search(SearchRequest request) {
341        try {
342            return client.search(request);
343        } catch (IOException e) {
344            throw new NuxeoException(e);
345        }
346    }
347
348    @Override
349    public SearchResponse searchScroll(SearchScrollRequest request) {
350        try {
351            return client.searchScroll(request);
352        } catch (IOException e) {
353            throw new NuxeoException(e);
354        }
355    }
356
357    @Override
358    public GetResponse get(GetRequest request) {
359        try {
360            return client.get(request);
361        } catch (IOException e) {
362            throw new NuxeoException(e);
363        }
364    }
365
366    @Override
367    public IndexResponse index(IndexRequest request) {
368        try {
369            return client.index(request);
370        } catch (IOException e) {
371            throw new NuxeoException(e);
372        }
373    }
374
375    @Override
376    public ClearScrollResponse clearScroll(ClearScrollRequest request) {
377        try {
378            return client.clearScroll(request);
379        } catch (IOException e) {
380            throw new NuxeoException(e);
381        }
382    }
383
384    @Override
385    public void close() {
386        if (lowLevelClient != null) {
387            try {
388                lowLevelClient.close();
389            } catch (IOException e) {
390                log.warn("Fail to close the Elasticsearch low level RestClient: " + e.getMessage(), e);
391            }
392            lowLevelClient = null;
393        }
394        client = null;
395    }
396}