001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import static java.util.Collections.emptyMap; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.util.Arrays; 026import java.util.Map; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.http.HttpEntity; 031import org.apache.http.HttpStatus; 032import org.apache.http.entity.ContentType; 033import org.apache.http.nio.entity.NStringEntity; 034import org.apache.http.util.EntityUtils; 035import org.elasticsearch.ElasticsearchStatusException; 036import org.elasticsearch.action.bulk.BulkRequest; 037import org.elasticsearch.action.bulk.BulkResponse; 038import org.elasticsearch.action.delete.DeleteRequest; 039import org.elasticsearch.action.delete.DeleteResponse; 040import org.elasticsearch.action.get.GetRequest; 041import org.elasticsearch.action.get.GetResponse; 042import org.elasticsearch.action.index.IndexRequest; 043import org.elasticsearch.action.index.IndexResponse; 044import org.elasticsearch.action.search.ClearScrollRequest; 045import org.elasticsearch.action.search.ClearScrollResponse; 046import org.elasticsearch.action.search.SearchRequest; 047import org.elasticsearch.action.search.SearchResponse; 048import org.elasticsearch.action.search.SearchScrollRequest; 049import org.elasticsearch.client.Response; 050import org.elasticsearch.client.ResponseException; 051import org.elasticsearch.client.RestClient; 052import org.elasticsearch.client.RestHighLevelClient; 053import org.elasticsearch.cluster.health.ClusterHealthStatus; 054import org.elasticsearch.common.xcontent.XContentHelper; 055import org.elasticsearch.common.xcontent.XContentType; 056import org.elasticsearch.index.engine.VersionConflictEngineException; 057import org.elasticsearch.rest.RestStatus; 058import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 059import org.nuxeo.ecm.core.api.NuxeoException; 060import org.nuxeo.elasticsearch.api.ESClient; 061 062/** 063 * @since 9.3 064 */ 065public class ESRestClient implements ESClient { 066 // TODO: add security sanitizer to make sure all parameters used to build requests are clean 067 private static final Log log = LogFactory.getLog(ESRestClient.class); 068 069 protected RestClient lowLevelClient; 070 071 protected RestHighLevelClient client; 072 073 public ESRestClient(RestClient lowLevelRestClient, RestHighLevelClient client) { 074 this.lowLevelClient = lowLevelRestClient; 075 this.client = client; 076 } 077 078 @Override 079 public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) { 080 ClusterHealthStatus healthStatus; 081 Response response; 082 try { 083 response = lowLevelClient.performRequest("GET", 084 String.format("/_cluster/health/%s?wait_for_status=yellow&timeout=%ds", 085 getIndexesAsString(indexNames), timeoutSecond)); 086 try (InputStream is = response.getEntity().getContent()) { 087 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 088 healthStatus = ClusterHealthStatus.fromString((String) map.get("status")); 089 } 090 } catch (IOException e) { 091 throw new NuxeoException(e); 092 } 093 switch (healthStatus) { 094 case GREEN: 095 log.info("Elasticsearch Cluster ready: " + response); 096 return true; 097 case YELLOW: 098 log.warn("Elasticsearch Cluster ready but not GREEN: " + response); 099 return false; 100 default: 101 String error = "Elasticsearch Cluster health status: " + healthStatus + ", not Yellow after " 102 + timeoutSecond + " give up: " + response; 103 throw new IllegalStateException(error); 104 } 105 } 106 107 protected String getIndexesAsString(String[] indexNames) { 108 return indexNames == null ? "" : String.join(",", indexNames); 109 } 110 111 @Override 112 public ClusterHealthStatus getHealthStatus(String[] indexNames) { 113 try { 114 Response response = lowLevelClient.performRequest("GET", 115 String.format("/_cluster/health/%s", getIndexesAsString(indexNames))); 116 try (InputStream is = response.getEntity().getContent()) { 117 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 118 return ClusterHealthStatus.fromString((String) map.get("status")); 119 } 120 } catch (IOException e) { 121 throw new NuxeoException(e); 122 } 123 } 124 125 @Override 126 public void refresh(String indexName) { 127 try { 128 lowLevelClient.performRequest("POST", "/" + indexName + "/_refresh"); 129 } catch (IOException e) { 130 throw new NuxeoException(e); 131 } 132 133 } 134 135 @Override 136 public void flush(String indexName) { 137 try { 138 lowLevelClient.performRequest("POST", "/" + indexName + "/_flush?wait_if_ongoing=true"); 139 } catch (IOException e) { 140 throw new NuxeoException(e); 141 } 142 } 143 144 @Override 145 public void optimize(String indexName) { 146 try { 147 lowLevelClient.performRequest("POST", "/" + indexName + "/_forcemerge?max_num_segments=1"); 148 } catch (IOException e) { 149 throw new NuxeoException(e); 150 } 151 } 152 153 @Override 154 public boolean indexExists(String indexName) { 155 Response response; 156 try { 157 response = lowLevelClient.performRequest("HEAD", "/" + indexName); 158 } catch (IOException e) { 159 throw new NuxeoException(e); 160 } 161 int code = response.getStatusLine().getStatusCode(); 162 if (code == HttpStatus.SC_OK) { 163 return true; 164 } else if (code == HttpStatus.SC_NOT_FOUND) { 165 return false; 166 } 167 throw new IllegalStateException(String.format("Checking index %s returns: %s", indexName, response)); 168 } 169 170 @Override 171 public boolean mappingExists(String indexName, String type) { 172 Response response; 173 try { 174 response = lowLevelClient.performRequest("HEAD", String.format("/%s/_mapping/%s", indexName, type)); 175 } catch (IOException e) { 176 throw new NuxeoException(e); 177 } 178 int code = response.getStatusLine().getStatusCode(); 179 if (code == HttpStatus.SC_OK) { 180 return true; 181 } else if (code == HttpStatus.SC_NOT_FOUND) { 182 return false; 183 } 184 throw new IllegalStateException(String.format("Checking mapping %s returns: %s", indexName, response)); 185 } 186 187 @Override 188 public void deleteIndex(String indexName, int timeoutSecond) { 189 Response response; 190 try { 191 response = lowLevelClient.performRequest("DELETE", 192 String.format("/%s?master_timeout=%ds", indexName, timeoutSecond)); 193 } catch (IOException e) { 194 if (e.getMessage() != null && e.getMessage().contains("illegal_argument_exception")) { 195 // when trying to delete an alias, throws the same exception than the transport client 196 throw new IllegalArgumentException(e); 197 } 198 throw new NuxeoException(e); 199 } 200 int code = response.getStatusLine().getStatusCode(); 201 if (code != HttpStatus.SC_OK) { 202 throw new IllegalStateException(String.format("Deleting %s returns: %s", indexName, response)); 203 } 204 } 205 206 @Override 207 public void createIndex(String indexName, String jsonSettings) { 208 HttpEntity entity = new NStringEntity(jsonSettings, ContentType.APPLICATION_JSON); 209 Response response; 210 try { 211 response = lowLevelClient.performRequest("PUT", "/" + indexName, emptyMap(), entity); 212 } catch (IOException e) { 213 throw new NuxeoException(e); 214 } 215 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 216 throw new NuxeoException("Fail to create index: " + indexName + " :" + response); 217 } 218 } 219 220 @Override 221 public void createMapping(String indexName, String type, String jsonMapping) { 222 HttpEntity entity = new NStringEntity(jsonMapping, ContentType.APPLICATION_JSON); 223 Response response; 224 try { 225 response = lowLevelClient.performRequest("PUT", String.format("/%s/%s/_mapping", indexName, type), 226 emptyMap(), entity); 227 } catch (IOException e) { 228 throw new NuxeoException(e); 229 } 230 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 231 throw new NuxeoException(String.format("Fail to create mapping on %s/%s: %s", indexName, type, response)); 232 } 233 234 } 235 236 @Override 237 public String getNodesInfo() { 238 try { 239 Response response = lowLevelClient.performRequest("GET", "/_nodes/_all"); 240 return EntityUtils.toString(response.getEntity()); 241 } catch (IOException e) { 242 throw new NuxeoException(e); 243 } 244 } 245 246 @Override 247 public String getNodesStats() { 248 try { 249 Response response = lowLevelClient.performRequest("GET", "/_nodes/stats"); 250 return EntityUtils.toString(response.getEntity()); 251 } catch (IOException e) { 252 throw new NuxeoException(e); 253 } 254 } 255 256 @Override 257 public boolean aliasExists(String aliasName) { 258 Response response; 259 try { 260 response = lowLevelClient.performRequest("HEAD", String.format("/_alias/%s", aliasName)); 261 } catch (IOException e) { 262 throw new NuxeoException(e); 263 } 264 int code = response.getStatusLine().getStatusCode(); 265 if (code == HttpStatus.SC_OK) { 266 return true; 267 } else if (code == HttpStatus.SC_NOT_FOUND) { 268 return false; 269 } 270 throw new IllegalStateException(String.format("Checking alias %s returns: %s", aliasName, response)); 271 } 272 273 @Override 274 public String getFirstIndexForAlias(String aliasName) { 275 if (!aliasExists(aliasName)) { 276 return null; 277 } 278 try { 279 Response response = lowLevelClient.performRequest("GET", String.format("/_alias/%s", aliasName)); 280 try (InputStream is = response.getEntity().getContent()) { 281 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 282 if (map.size() != 1) { 283 throw new NuxeoException(String.format( 284 "Expecting alias that point to a single index, alias: %s, got: %s", aliasName, response)); 285 } 286 return map.keySet().iterator().next(); 287 } 288 } catch (IOException e) { 289 throw new NuxeoException(e); 290 } 291 } 292 293 @Override 294 public void updateAlias(String aliasName, String indexName) { 295 // TODO do this in a single call to make it atomically 296 if (aliasExists(aliasName)) { 297 deleteAlias(aliasName); 298 } 299 if (indexExists(aliasName)) { 300 throw new NuxeoException("Can create an alias because an index with the same name exists: " + aliasName); 301 } 302 createAlias(aliasName, indexName); 303 } 304 305 protected void deleteAlias(String aliasName) { 306 Response response; 307 try { 308 response = lowLevelClient.performRequest("DELETE", String.format("/_all/_alias/%s", aliasName)); 309 } catch (IOException e) { 310 throw new NuxeoException(e); 311 } 312 int code = response.getStatusLine().getStatusCode(); 313 if (code != HttpStatus.SC_OK) { 314 throw new IllegalStateException(String.format("Deleting %s alias: %s", aliasName, response)); 315 } 316 } 317 318 protected void createAlias(String aliasName, String indexName) { 319 Response response; 320 try { 321 response = lowLevelClient.performRequest("PUT", String.format("/%s/_alias/%s", indexName, aliasName), 322 emptyMap()); 323 } catch (IOException e) { 324 throw new NuxeoException(e); 325 } 326 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 327 throw new NuxeoException("Fail to create alias: " + indexName + " :" + response); 328 } 329 } 330 331 @Override 332 public BulkResponse bulk(BulkRequest request) { 333 try { 334 return client.bulk(request); 335 } catch (IOException e) { 336 throw new NuxeoException(e); 337 } 338 } 339 340 @Override 341 public DeleteResponse delete(DeleteRequest request) { 342 try { 343 return client.delete(request); 344 } catch (IOException e) { 345 throw new NuxeoException(e); 346 } 347 } 348 349 @Override 350 public SearchResponse search(SearchRequest request) { 351 try { 352 return client.search(request); 353 } catch (IOException e) { 354 throw new NuxeoException(e); 355 } 356 } 357 358 @Override 359 public SearchResponse searchScroll(SearchScrollRequest request) { 360 try { 361 return client.searchScroll(request); 362 } catch (IOException e) { 363 throw new NuxeoException(e); 364 } 365 } 366 367 @Override 368 public GetResponse get(GetRequest request) { 369 try { 370 return client.get(request); 371 } catch (IOException e) { 372 throw new NuxeoException(e); 373 } 374 } 375 376 @Override 377 public IndexResponse index(IndexRequest request) { 378 try { 379 return client.index(request); 380 } catch (ElasticsearchStatusException e) { 381 if (RestStatus.CONFLICT.equals(e.status())) { 382 throw new ConcurrentUpdateException(e); 383 } 384 throw new NuxeoException(e); 385 } catch (IOException e) { 386 throw new NuxeoException(e); 387 } 388 } 389 390 @Override 391 public ClearScrollResponse clearScroll(ClearScrollRequest request) { 392 try { 393 if (log.isDebugEnabled()) { 394 log.debug(String.format("Clearing scroll ids: %s", 395 Arrays.toString(request.getScrollIds().toArray()))); 396 } 397 return client.clearScroll(request); 398 } catch (ElasticsearchStatusException e) { 399 if (RestStatus.NOT_FOUND.equals(e.status())) { 400 if (log.isDebugEnabled()) { 401 log.debug(String.format("Scroll ids not found, they have certainly been already closed", 402 Arrays.toString(request.getScrollIds().toArray()))); 403 } 404 return new ClearScrollResponse(true, 0); 405 } 406 throw new NuxeoException(e); 407 } catch (IOException e) { 408 throw new NuxeoException(e); 409 } 410 } 411 412 @Override 413 public void close() { 414 if (lowLevelClient != null) { 415 try { 416 lowLevelClient.close(); 417 } catch (IOException e) { 418 log.warn("Fail to close the Elasticsearch low level RestClient: " + e.getMessage(), e); 419 } 420 lowLevelClient = null; 421 } 422 client = null; 423 } 424}