001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.Arrays; 024import java.util.HashMap; 025import java.util.Map; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.apache.http.HttpStatus; 030import org.apache.http.util.EntityUtils; 031import org.elasticsearch.ElasticsearchStatusException; 032import org.elasticsearch.action.bulk.BulkProcessor; 033import org.elasticsearch.action.bulk.BulkRequest; 034import org.elasticsearch.action.bulk.BulkResponse; 035import org.elasticsearch.action.delete.DeleteRequest; 036import org.elasticsearch.action.delete.DeleteResponse; 037import org.elasticsearch.action.get.GetRequest; 038import org.elasticsearch.action.get.GetResponse; 039import org.elasticsearch.action.index.IndexRequest; 040import org.elasticsearch.action.index.IndexResponse; 041import org.elasticsearch.action.search.ClearScrollRequest; 042import org.elasticsearch.action.search.ClearScrollResponse; 043import org.elasticsearch.action.search.SearchRequest; 044import org.elasticsearch.action.search.SearchResponse; 045import org.elasticsearch.action.search.SearchScrollRequest; 046import org.elasticsearch.client.Request; 047import org.elasticsearch.client.RequestOptions; 048import org.elasticsearch.client.Response; 049import org.elasticsearch.client.RestClient; 050import org.elasticsearch.client.RestHighLevelClient; 051import org.elasticsearch.cluster.health.ClusterHealthStatus; 052import org.elasticsearch.common.xcontent.XContentHelper; 053import org.elasticsearch.common.xcontent.XContentType; 054import org.elasticsearch.rest.RestStatus; 055import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 056import org.nuxeo.ecm.core.api.NuxeoException; 057import org.nuxeo.elasticsearch.api.ESClient; 058 059import io.opencensus.common.Scope; 060import io.opencensus.trace.AttributeValue; 061import io.opencensus.trace.Span; 062import io.opencensus.trace.Tracing; 063 064/** 065 * @since 9.3 066 */ 067public class ESRestClient implements ESClient { 068 // TODO: add security sanitizer to make sure all parameters used to build requests are clean 069 private static final Log log = LogFactory.getLog(ESRestClient.class); 070 071 public static final String CREATE_INDEX_TIMEOUT = "60s"; 072 073 protected RestClient lowLevelClient; 074 075 protected RestHighLevelClient client; 076 077 public ESRestClient(RestClient lowLevelRestClient, RestHighLevelClient client) { 078 this.lowLevelClient = lowLevelRestClient; 079 this.client = client; 080 } 081 082 @Override 083 public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) { 084 ClusterHealthStatus healthStatus; 085 Response response; 086 try { 087 response = performRequestWithTracing( 088 new Request("GET", String.format("/_cluster/health/%s?wait_for_status=yellow&timeout=%ds", 089 getIndexesAsString(indexNames), timeoutSecond))); 090 try (InputStream is = response.getEntity().getContent()) { 091 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 092 healthStatus = ClusterHealthStatus.fromString((String) map.get("status")); 093 } 094 } catch (IOException e) { 095 throw new NuxeoException(e); 096 } 097 switch (healthStatus) { 098 case GREEN: 099 log.info("Elasticsearch Cluster ready: " + response); 100 return true; 101 case YELLOW: 102 log.warn("Elasticsearch Cluster ready but not GREEN: " + response); 103 return false; 104 default: 105 String error = "Elasticsearch Cluster health status: " + healthStatus + ", not Yellow after " 106 + timeoutSecond + " give up: " + response; 107 throw new IllegalStateException(error); 108 } 109 } 110 111 protected String getIndexesAsString(String[] indexNames) { 112 return indexNames == null ? "" : String.join(",", indexNames); 113 } 114 115 @Override 116 public ClusterHealthStatus getHealthStatus(String[] indexNames) { 117 Response response = performRequestWithTracing( 118 new Request("GET", String.format("/_cluster/health/%s", getIndexesAsString(indexNames)))); 119 try (InputStream is = response.getEntity().getContent()) { 120 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 121 return ClusterHealthStatus.fromString((String) map.get("status")); 122 } catch (IOException e) { 123 throw new NuxeoException(e); 124 } 125 } 126 127 @Override 128 public void refresh(String indexName) { 129 performRequestWithTracing(new Request("POST", "/" + indexName + "/_refresh")); 130 } 131 132 @Override 133 public void flush(String indexName) { 134 performRequestWithTracing(new Request("POST", "/" + indexName + "/_flush?wait_if_ongoing=true")); 135 } 136 137 @Override 138 public void optimize(String indexName) { 139 performRequestWithTracing(new Request("POST", "/" + indexName + "/_forcemerge?max_num_segments=1")); 140 } 141 142 @Override 143 public boolean indexExists(String indexName) { 144 Response response = performRequestWithTracing(new Request("HEAD", "/" + indexName)); 145 switch (response.getStatusLine().getStatusCode()) { 146 case HttpStatus.SC_OK: 147 return true; 148 case HttpStatus.SC_NOT_FOUND: 149 return false; 150 default: 151 throw new IllegalStateException(String.format("Checking index %s returns: %s", indexName, response)); 152 } 153 } 154 155 @Override 156 public boolean mappingExists(String indexName, String type) { 157 // HEAD is not supported anymore since elastic 7.x 158 Response response = performRequestWithTracing( 159 new Request("GET", String.format("/%s/_mapping", indexName))); 160 switch (response.getStatusLine().getStatusCode()) { 161 case HttpStatus.SC_OK: 162 return true; 163 case HttpStatus.SC_NOT_FOUND: 164 return false; 165 default: 166 throw new IllegalStateException(String.format("Checking mapping %s returns: %s", indexName, response)); 167 } 168 } 169 170 @Override 171 public void deleteIndex(String indexName, int timeoutSecond) { 172 Response response; 173 try { 174 response = lowLevelClient.performRequest( 175 new Request("DELETE", String.format("/%s?master_timeout=%ds", indexName, timeoutSecond))); 176 } catch (IOException e) { 177 if (e.getMessage() != null && e.getMessage().contains("illegal_argument_exception")) { 178 // when trying to delete an alias, throws the same exception than the transport client 179 throw new IllegalArgumentException(e); 180 } 181 throw new NuxeoException(e); 182 } 183 int code = response.getStatusLine().getStatusCode(); 184 if (code != HttpStatus.SC_OK) { 185 throw new IllegalStateException(String.format("Deleting %s returns: %s", indexName, response)); 186 } 187 } 188 189 @Override 190 public void createIndex(String indexName, String jsonSettings) { 191 Request request = new Request("PUT", "/" + indexName + "?timeout=" + CREATE_INDEX_TIMEOUT); 192 // since elastic 7 REST API needs an additional level 193 request.setJsonEntity("{\"settings\": " + jsonSettings + "}"); 194 Response response = performRequestWithTracing(request); 195 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 196 throw new NuxeoException("Fail to create index: " + indexName + " :" + response); 197 } 198 } 199 200 @Override 201 public void createMapping(String indexName, String type, String jsonMapping) { 202 Request request = new Request("PUT", String.format("/%s/_mapping", indexName)); 203 request.setJsonEntity(jsonMapping); 204 Response response = performRequestWithTracing(request); 205 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 206 throw new NuxeoException(String.format("Fail to create mapping on %s: %s", indexName, response)); 207 } 208 } 209 210 protected Response performRequest(Request request) { 211 try { 212 return lowLevelClient.performRequest(request); 213 } catch (IOException e) { 214 throw new NuxeoException(e); 215 } 216 } 217 218 protected Response performRequestWithTracing(Request request) { 219 try (Scope ignored = getScopedSpan("elastic" + request.getEndpoint(), request.toString())) { 220 return performRequest(request); 221 } 222 } 223 224 @Override 225 public String getNodesInfo() { 226 Response response = performRequestWithTracing(new Request("GET", "/_nodes/_all")); 227 try { 228 return EntityUtils.toString(response.getEntity()); 229 } catch (IOException e) { 230 throw new NuxeoException(e); 231 } 232 } 233 234 @Override 235 public String getNodesStats() { 236 Response response = performRequestWithTracing(new Request("GET", "/_nodes/stats")); 237 try { 238 return EntityUtils.toString(response.getEntity()); 239 } catch (IOException e) { 240 throw new NuxeoException(e); 241 } 242 } 243 244 @Override 245 public boolean aliasExists(String aliasName) { 246 Response response = performRequestWithTracing(new Request("HEAD", String.format("/_alias/%s", aliasName))); 247 switch (response.getStatusLine().getStatusCode()) { 248 case HttpStatus.SC_OK: 249 return true; 250 case HttpStatus.SC_NOT_FOUND: 251 return false; 252 default: 253 throw new IllegalStateException(String.format("Checking alias %s returns: %s", aliasName, response)); 254 } 255 } 256 257 @Override 258 public String getFirstIndexForAlias(String aliasName) { 259 if (!aliasExists(aliasName)) { 260 return null; 261 } 262 Response response = performRequestWithTracing(new Request("GET", String.format("/_alias/%s", aliasName))); 263 try (InputStream is = response.getEntity().getContent()) { 264 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 265 if (map.size() != 1) { 266 throw new NuxeoException(String.format( 267 "Expecting alias that point to a single index, alias: %s, got: %s", aliasName, response)); 268 } 269 return map.keySet().iterator().next(); 270 } catch (IOException e) { 271 throw new NuxeoException(e); 272 } 273 } 274 275 @Override 276 public void updateAlias(String aliasName, String indexName) { 277 // TODO do this in a single call to make it atomically 278 if (aliasExists(aliasName)) { 279 deleteAlias(aliasName); 280 } 281 if (indexExists(aliasName)) { 282 throw new NuxeoException("Can create an alias because an index with the same name exists: " + aliasName); 283 } 284 createAlias(aliasName, indexName); 285 } 286 287 protected void deleteAlias(String aliasName) { 288 Response response = performRequestWithTracing( 289 new Request("DELETE", String.format("/_all/_alias/%s", aliasName))); 290 int code = response.getStatusLine().getStatusCode(); 291 if (code != HttpStatus.SC_OK) { 292 throw new IllegalStateException(String.format("Deleting %s alias: %s", aliasName, response)); 293 } 294 } 295 296 protected void createAlias(String aliasName, String indexName) { 297 Response response = performRequestWithTracing( 298 new Request("PUT", String.format("/%s/_alias/%s", indexName, aliasName))); 299 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 300 throw new NuxeoException("Fail to create alias: " + indexName + " :" + response); 301 } 302 } 303 304 @Override 305 public BulkResponse bulk(BulkRequest request) { 306 try (Scope ignored = getScopedSpan("elastic/_bulk", "actions: " + request.numberOfActions())) { 307 return client.bulk(request, RequestOptions.DEFAULT); 308 } catch (IOException e) { 309 throw new NuxeoException(e); 310 } 311 } 312 313 @Override 314 public DeleteResponse delete(DeleteRequest request) { 315 try { 316 return client.delete(request, RequestOptions.DEFAULT); 317 } catch (IOException e) { 318 throw new NuxeoException(e); 319 } 320 } 321 322 @Override 323 public SearchResponse search(SearchRequest request) { 324 try (Scope ignored = getScopedSpan("elastic/_search", request.toString())) { 325 return client.search(request, RequestOptions.DEFAULT); 326 } catch (IOException | ElasticsearchStatusException e) { 327 // ElasticsearchStatusException is raised when using phrase prefix on keyword type 328 throw new NuxeoException(e); 329 } 330 } 331 332 @Override 333 public SearchResponse searchScroll(SearchScrollRequest request) { 334 try (Scope ignored = getScopedSpan("elastic/_scroll", request.toString())) { 335 return client.scroll(request, RequestOptions.DEFAULT); 336 } catch (IOException e) { 337 throw new NuxeoException(e); 338 } 339 } 340 341 @Override 342 public GetResponse get(GetRequest request) { 343 try (Scope ignored = getScopedSpan("elastic/_get", request.toString())) { 344 return client.get(request, RequestOptions.DEFAULT); 345 } catch (IOException e) { 346 throw new NuxeoException(e); 347 } 348 } 349 350 @Override 351 public IndexResponse index(IndexRequest request) { 352 try (Scope ignored = getScopedSpan("elastic/_index", request.toString())) { 353 return client.index(request, RequestOptions.DEFAULT); 354 } catch (ElasticsearchStatusException e) { 355 if (RestStatus.CONFLICT.equals(e.status())) { 356 throw new ConcurrentUpdateException(e); 357 } 358 throw new NuxeoException(e); 359 } catch (IOException e) { 360 throw new NuxeoException(e); 361 } 362 } 363 364 protected Scope getScopedSpan(String name, String request) { 365 Scope scope = Tracing.getTracer().spanBuilder(name).setSpanKind(Span.Kind.CLIENT).startScopedSpan(); 366 Map<String, AttributeValue> map = new HashMap<>(); 367 map.put("thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName())); 368 map.put("request", AttributeValue.stringAttributeValue(request)); 369 Tracing.getTracer().getCurrentSpan().putAttributes(map); 370 return scope; 371 } 372 373 @Override 374 public ClearScrollResponse clearScroll(ClearScrollRequest request) { 375 try { 376 if (log.isDebugEnabled()) { 377 log.debug(String.format("Clearing scroll ids: %s", Arrays.toString(request.getScrollIds().toArray()))); 378 } 379 return client.clearScroll(request, RequestOptions.DEFAULT); 380 } catch (ElasticsearchStatusException e) { 381 if (RestStatus.NOT_FOUND.equals(e.status())) { 382 if (log.isDebugEnabled()) { 383 log.debug(String.format("Scroll ids not found, they have certainly been already closed: %s", 384 Arrays.toString(request.getScrollIds().toArray()))); 385 } 386 return new ClearScrollResponse(true, 0); 387 } 388 throw new NuxeoException(e); 389 } catch (IOException e) { 390 throw new NuxeoException(e); 391 } 392 } 393 394 @Override 395 public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener bulkListener) { 396 return BulkProcessor.builder((request, listener) -> client.bulkAsync(request, RequestOptions.DEFAULT, listener), 397 bulkListener); 398 } 399 400 @Override 401 public void close() { 402 if (lowLevelClient != null) { 403 try { 404 lowLevelClient.close(); 405 } catch (IOException e) { 406 log.warn("Fail to close the Elasticsearch low level RestClient: " + e.getMessage(), e); 407 } 408 lowLevelClient = null; 409 } 410 client = null; 411 } 412}