001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.Arrays; 024import java.util.Map; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.http.HttpStatus; 029import org.apache.http.util.EntityUtils; 030import org.elasticsearch.ElasticsearchStatusException; 031import org.elasticsearch.action.bulk.BulkProcessor; 032import org.elasticsearch.action.bulk.BulkRequest; 033import org.elasticsearch.action.bulk.BulkResponse; 034import org.elasticsearch.action.delete.DeleteRequest; 035import org.elasticsearch.action.delete.DeleteResponse; 036import org.elasticsearch.action.get.GetRequest; 037import org.elasticsearch.action.get.GetResponse; 038import org.elasticsearch.action.index.IndexRequest; 039import org.elasticsearch.action.index.IndexResponse; 040import org.elasticsearch.action.search.ClearScrollRequest; 041import org.elasticsearch.action.search.ClearScrollResponse; 042import org.elasticsearch.action.search.SearchRequest; 043import org.elasticsearch.action.search.SearchResponse; 044import org.elasticsearch.action.search.SearchScrollRequest; 045import org.elasticsearch.client.Request; 046import org.elasticsearch.client.RequestOptions; 047import org.elasticsearch.client.Response; 048import org.elasticsearch.client.RestClient; 049import org.elasticsearch.client.RestHighLevelClient; 050import org.elasticsearch.cluster.health.ClusterHealthStatus; 051import org.elasticsearch.common.xcontent.XContentHelper; 052import org.elasticsearch.common.xcontent.XContentType; 053import org.elasticsearch.rest.RestStatus; 054import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 055import org.nuxeo.ecm.core.api.NuxeoException; 056import org.nuxeo.elasticsearch.api.ESClient; 057 058/** 059 * @since 9.3 060 */ 061public class ESRestClient implements ESClient { 062 // TODO: add security sanitizer to make sure all parameters used to build requests are clean 063 private static final Log log = LogFactory.getLog(ESRestClient.class); 064 065 public static final String CREATE_INDEX_TIMEOUT = "60s"; 066 067 protected RestClient lowLevelClient; 068 069 protected RestHighLevelClient client; 070 071 public ESRestClient(RestClient lowLevelRestClient, RestHighLevelClient client) { 072 this.lowLevelClient = lowLevelRestClient; 073 this.client = client; 074 } 075 076 @Override 077 public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) { 078 ClusterHealthStatus healthStatus; 079 Response response; 080 try { 081 response = performRequest( 082 new Request("GET", String.format("/_cluster/health/%s?wait_for_status=yellow&timeout=%ds", 083 getIndexesAsString(indexNames), timeoutSecond))); 084 try (InputStream is = response.getEntity().getContent()) { 085 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 086 healthStatus = ClusterHealthStatus.fromString((String) map.get("status")); 087 } 088 } catch (IOException e) { 089 throw new NuxeoException(e); 090 } 091 switch (healthStatus) { 092 case GREEN: 093 log.info("Elasticsearch Cluster ready: " + response); 094 return true; 095 case YELLOW: 096 log.warn("Elasticsearch Cluster ready but not GREEN: " + response); 097 return false; 098 default: 099 String error = "Elasticsearch Cluster health status: " + healthStatus + ", not Yellow after " 100 + timeoutSecond + " give up: " + response; 101 throw new IllegalStateException(error); 102 } 103 } 104 105 protected String getIndexesAsString(String[] indexNames) { 106 return indexNames == null ? "" : String.join(",", indexNames); 107 } 108 109 @Override 110 public ClusterHealthStatus getHealthStatus(String[] indexNames) { 111 Response response = performRequest( 112 new Request("GET", String.format("/_cluster/health/%s", getIndexesAsString(indexNames)))); 113 try (InputStream is = response.getEntity().getContent()) { 114 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 115 return ClusterHealthStatus.fromString((String) map.get("status")); 116 } catch (IOException e) { 117 throw new NuxeoException(e); 118 } 119 } 120 121 @Override 122 public void refresh(String indexName) { 123 performRequest(new Request("POST", "/" + indexName + "/_refresh")); 124 } 125 126 @Override 127 public void flush(String indexName) { 128 performRequest(new Request("POST", "/" + indexName + "/_flush?wait_if_ongoing=true")); 129 } 130 131 @Override 132 public void optimize(String indexName) { 133 performRequest(new Request("POST", "/" + indexName + "/_forcemerge?max_num_segments=1")); 134 } 135 136 @Override 137 public boolean indexExists(String indexName) { 138 Response response = performRequest(new Request("HEAD", "/" + indexName)); 139 switch (response.getStatusLine().getStatusCode()) { 140 case HttpStatus.SC_OK: 141 return true; 142 case HttpStatus.SC_NOT_FOUND: 143 return false; 144 default: 145 throw new IllegalStateException(String.format("Checking index %s returns: %s", indexName, response)); 146 } 147 } 148 149 @Override 150 public boolean mappingExists(String indexName, String type) { 151 Response response = performRequest(new Request("HEAD", String.format("/%s/_mapping/%s", indexName, type))); 152 switch (response.getStatusLine().getStatusCode()) { 153 case HttpStatus.SC_OK: 154 return true; 155 case HttpStatus.SC_NOT_FOUND: 156 return false; 157 default: 158 throw new IllegalStateException(String.format("Checking mapping %s returns: %s", indexName, response)); 159 } 160 } 161 162 @Override 163 public void deleteIndex(String indexName, int timeoutSecond) { 164 Response response; 165 try { 166 response = lowLevelClient.performRequest( 167 new Request("DELETE", String.format("/%s?master_timeout=%ds", indexName, timeoutSecond))); 168 } catch (IOException e) { 169 if (e.getMessage() != null && e.getMessage().contains("illegal_argument_exception")) { 170 // when trying to delete an alias, throws the same exception than the transport client 171 throw new IllegalArgumentException(e); 172 } 173 throw new NuxeoException(e); 174 } 175 int code = response.getStatusLine().getStatusCode(); 176 if (code != HttpStatus.SC_OK) { 177 throw new IllegalStateException(String.format("Deleting %s returns: %s", indexName, response)); 178 } 179 } 180 181 @Override 182 public void createIndex(String indexName, String jsonSettings) { 183 Request request = new Request("PUT", "/" + indexName + "?timeout=" + CREATE_INDEX_TIMEOUT); 184 request.setJsonEntity(jsonSettings); 185 Response response = performRequest(request); 186 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 187 throw new NuxeoException("Fail to create index: " + indexName + " :" + response); 188 } 189 } 190 191 @Override 192 public void createMapping(String indexName, String type, String jsonMapping) { 193 Request request = new Request("PUT", String.format("/%s/%s/_mapping", indexName, type)); 194 request.setJsonEntity(jsonMapping); 195 Response response = performRequest(request); 196 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 197 throw new NuxeoException(String.format("Fail to create mapping on %s/%s: %s", indexName, type, response)); 198 } 199 } 200 201 protected Response performRequest(Request request) { 202 try { 203 return lowLevelClient.performRequest(request); 204 } catch (IOException e) { 205 throw new NuxeoException(e); 206 } 207 } 208 209 @Override 210 public String getNodesInfo() { 211 Response response = performRequest(new Request("GET", "/_nodes/_all")); 212 try { 213 return EntityUtils.toString(response.getEntity()); 214 } catch (IOException e) { 215 throw new NuxeoException(e); 216 } 217 } 218 219 @Override 220 public String getNodesStats() { 221 Response response = performRequest(new Request("GET", "/_nodes/stats")); 222 try { 223 return EntityUtils.toString(response.getEntity()); 224 } catch (IOException e) { 225 throw new NuxeoException(e); 226 } 227 } 228 229 @Override 230 public boolean aliasExists(String aliasName) { 231 Response response = performRequest(new Request("HEAD", String.format("/_alias/%s", aliasName))); 232 switch (response.getStatusLine().getStatusCode()) { 233 case HttpStatus.SC_OK: 234 return true; 235 case HttpStatus.SC_NOT_FOUND: 236 return false; 237 default: 238 throw new IllegalStateException(String.format("Checking alias %s returns: %s", aliasName, response)); 239 } 240 } 241 242 @Override 243 public String getFirstIndexForAlias(String aliasName) { 244 if (!aliasExists(aliasName)) { 245 return null; 246 } 247 Response response = performRequest(new Request("GET", String.format("/_alias/%s", aliasName))); 248 try (InputStream is = response.getEntity().getContent()) { 249 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 250 if (map.size() != 1) { 251 throw new NuxeoException(String.format( 252 "Expecting alias that point to a single index, alias: %s, got: %s", aliasName, response)); 253 } 254 return map.keySet().iterator().next(); 255 } catch (IOException e) { 256 throw new NuxeoException(e); 257 } 258 } 259 260 @Override 261 public void updateAlias(String aliasName, String indexName) { 262 // TODO do this in a single call to make it atomically 263 if (aliasExists(aliasName)) { 264 deleteAlias(aliasName); 265 } 266 if (indexExists(aliasName)) { 267 throw new NuxeoException("Can create an alias because an index with the same name exists: " + aliasName); 268 } 269 createAlias(aliasName, indexName); 270 } 271 272 protected void deleteAlias(String aliasName) { 273 Response response = performRequest(new Request("DELETE", String.format("/_all/_alias/%s", aliasName))); 274 int code = response.getStatusLine().getStatusCode(); 275 if (code != HttpStatus.SC_OK) { 276 throw new IllegalStateException(String.format("Deleting %s alias: %s", aliasName, response)); 277 } 278 } 279 280 protected void createAlias(String aliasName, String indexName) { 281 Response response = performRequest(new Request("PUT", String.format("/%s/_alias/%s", indexName, aliasName))); 282 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 283 throw new NuxeoException("Fail to create alias: " + indexName + " :" + response); 284 } 285 } 286 287 @Override 288 public BulkResponse bulk(BulkRequest request) { 289 try { 290 return client.bulk(request, RequestOptions.DEFAULT); 291 } catch (IOException e) { 292 throw new NuxeoException(e); 293 } 294 } 295 296 @Override 297 public DeleteResponse delete(DeleteRequest request) { 298 try { 299 return client.delete(request, RequestOptions.DEFAULT); 300 } catch (IOException e) { 301 throw new NuxeoException(e); 302 } 303 } 304 305 @Override 306 public SearchResponse search(SearchRequest request) { 307 try { 308 return client.search(request, RequestOptions.DEFAULT); 309 } catch (IOException e) { 310 throw new NuxeoException(e); 311 } 312 } 313 314 @Override 315 public SearchResponse searchScroll(SearchScrollRequest request) { 316 try { 317 return client.scroll(request, RequestOptions.DEFAULT); 318 } catch (IOException e) { 319 throw new NuxeoException(e); 320 } 321 } 322 323 @Override 324 public GetResponse get(GetRequest request) { 325 try { 326 return client.get(request, RequestOptions.DEFAULT); 327 } catch (IOException e) { 328 throw new NuxeoException(e); 329 } 330 } 331 332 @Override 333 public IndexResponse index(IndexRequest request) { 334 try { 335 return client.index(request, RequestOptions.DEFAULT); 336 } catch (ElasticsearchStatusException e) { 337 if (RestStatus.CONFLICT.equals(e.status())) { 338 throw new ConcurrentUpdateException(e); 339 } 340 throw new NuxeoException(e); 341 } catch (IOException e) { 342 throw new NuxeoException(e); 343 } 344 } 345 346 @Override 347 public ClearScrollResponse clearScroll(ClearScrollRequest request) { 348 try { 349 if (log.isDebugEnabled()) { 350 log.debug(String.format("Clearing scroll ids: %s", Arrays.toString(request.getScrollIds().toArray()))); 351 } 352 return client.clearScroll(request, RequestOptions.DEFAULT); 353 } catch (ElasticsearchStatusException e) { 354 if (RestStatus.NOT_FOUND.equals(e.status())) { 355 if (log.isDebugEnabled()) { 356 log.debug(String.format("Scroll ids not found, they have certainly been already closed: %s", 357 Arrays.toString(request.getScrollIds().toArray()))); 358 } 359 return new ClearScrollResponse(true, 0); 360 } 361 throw new NuxeoException(e); 362 } catch (IOException e) { 363 throw new NuxeoException(e); 364 } 365 } 366 367 @Override 368 public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener bulkListener) { 369 return BulkProcessor.builder((request, listener) -> client.bulkAsync(request, RequestOptions.DEFAULT, listener), 370 bulkListener); 371 } 372 373 @Override 374 public void close() { 375 if (lowLevelClient != null) { 376 try { 377 lowLevelClient.close(); 378 } catch (IOException e) { 379 log.warn("Fail to close the Elasticsearch low level RestClient: " + e.getMessage(), e); 380 } 381 lowLevelClient = null; 382 } 383 client = null; 384 } 385}