001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import static java.util.Collections.emptyMap; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.util.Map; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.apache.http.HttpEntity; 030import org.apache.http.HttpStatus; 031import org.apache.http.entity.ContentType; 032import org.apache.http.nio.entity.NStringEntity; 033import org.apache.http.util.EntityUtils; 034import org.elasticsearch.action.bulk.BulkRequest; 035import org.elasticsearch.action.bulk.BulkResponse; 036import org.elasticsearch.action.delete.DeleteRequest; 037import org.elasticsearch.action.delete.DeleteResponse; 038import org.elasticsearch.action.get.GetRequest; 039import org.elasticsearch.action.get.GetResponse; 040import org.elasticsearch.action.index.IndexRequest; 041import org.elasticsearch.action.index.IndexResponse; 042import org.elasticsearch.action.search.ClearScrollRequest; 043import org.elasticsearch.action.search.ClearScrollResponse; 044import org.elasticsearch.action.search.SearchRequest; 045import org.elasticsearch.action.search.SearchResponse; 046import org.elasticsearch.action.search.SearchScrollRequest; 047import org.elasticsearch.client.Response; 048import org.elasticsearch.client.RestClient; 049import org.elasticsearch.client.RestHighLevelClient; 050import org.elasticsearch.cluster.health.ClusterHealthStatus; 051import org.elasticsearch.common.xcontent.XContentHelper; 052import org.elasticsearch.common.xcontent.XContentType; 053import org.nuxeo.ecm.core.api.NuxeoException; 054import org.nuxeo.elasticsearch.api.ESClient; 055 056/** 057 * @since 9.3 058 */ 059public class ESRestClient implements ESClient { 060 // TODO: add security sanitizer to make sure all parameters used to build requests are clean 061 private static final Log log = LogFactory.getLog(ESRestClient.class); 062 063 protected RestClient lowLevelClient; 064 065 protected RestHighLevelClient client; 066 067 public ESRestClient(RestClient lowLevelRestClient, RestHighLevelClient client) { 068 this.lowLevelClient = lowLevelRestClient; 069 this.client = client; 070 } 071 072 @Override 073 public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) { 074 ClusterHealthStatus healthStatus; 075 Response response; 076 try { 077 response = lowLevelClient.performRequest("GET", 078 String.format("/_cluster/health/%s?wait_for_status=yellow&timeout=%ds", 079 getIndexesAsString(indexNames), timeoutSecond)); 080 try (InputStream is = response.getEntity().getContent()) { 081 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 082 healthStatus = ClusterHealthStatus.fromString((String) map.get("status")); 083 } 084 } catch (IOException e) { 085 throw new NuxeoException(e); 086 } 087 switch (healthStatus) { 088 case GREEN: 089 log.info("Elasticsearch Cluster ready: " + response); 090 return true; 091 case YELLOW: 092 log.warn("Elasticsearch Cluster ready but not GREEN: " + response); 093 return false; 094 default: 095 String error = "Elasticsearch Cluster health status: " + healthStatus + ", not Yellow after " 096 + timeoutSecond + " give up: " + response; 097 throw new IllegalStateException(error); 098 } 099 } 100 101 protected String getIndexesAsString(String[] indexNames) { 102 return indexNames == null ? "" : String.join(",", indexNames); 103 } 104 105 @Override 106 public ClusterHealthStatus getHealthStatus(String[] indexNames) { 107 try { 108 Response response = lowLevelClient.performRequest("GET", 109 String.format("/_cluster/health/%s", getIndexesAsString(indexNames))); 110 try (InputStream is = response.getEntity().getContent()) { 111 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 112 return ClusterHealthStatus.fromString((String) map.get("status")); 113 } 114 } catch (IOException e) { 115 throw new NuxeoException(e); 116 } 117 } 118 119 @Override 120 public void refresh(String indexName) { 121 try { 122 lowLevelClient.performRequest("POST", "/" + indexName + "/_refresh"); 123 } catch (IOException e) { 124 throw new NuxeoException(e); 125 } 126 127 } 128 129 @Override 130 public void flush(String indexName) { 131 try { 132 lowLevelClient.performRequest("POST", "/" + indexName + "/_flush?wait_if_ongoing=true"); 133 } catch (IOException e) { 134 throw new NuxeoException(e); 135 } 136 } 137 138 @Override 139 public void optimize(String indexName) { 140 try { 141 lowLevelClient.performRequest("POST", "/" + indexName + "/_forcemerge?max_num_segments=1"); 142 } catch (IOException e) { 143 throw new NuxeoException(e); 144 } 145 } 146 147 @Override 148 public boolean indexExists(String indexName) { 149 Response response; 150 try { 151 response = lowLevelClient.performRequest("HEAD", "/" + indexName); 152 } catch (IOException e) { 153 throw new NuxeoException(e); 154 } 155 int code = response.getStatusLine().getStatusCode(); 156 if (code == HttpStatus.SC_OK) { 157 return true; 158 } else if (code == HttpStatus.SC_NOT_FOUND) { 159 return false; 160 } 161 throw new IllegalStateException(String.format("Checking index %s returns: %s", indexName, response)); 162 } 163 164 @Override 165 public boolean mappingExists(String indexName, String type) { 166 Response response; 167 try { 168 response = lowLevelClient.performRequest("HEAD", String.format("/%s/_mapping/%s", indexName, type)); 169 } catch (IOException e) { 170 throw new NuxeoException(e); 171 } 172 int code = response.getStatusLine().getStatusCode(); 173 if (code == HttpStatus.SC_OK) { 174 return true; 175 } else if (code == HttpStatus.SC_NOT_FOUND) { 176 return false; 177 } 178 throw new IllegalStateException(String.format("Checking mapping %s returns: %s", indexName, response)); 179 } 180 181 @Override 182 public void deleteIndex(String indexName, int timeoutSecond) { 183 Response response; 184 try { 185 response = lowLevelClient.performRequest("DELETE", 186 String.format("/%s?master_timeout=%ds", indexName, timeoutSecond)); 187 } catch (IOException e) { 188 throw new NuxeoException(e); 189 } 190 int code = response.getStatusLine().getStatusCode(); 191 if (code != HttpStatus.SC_OK) { 192 throw new IllegalStateException(String.format("Deleting %s returns: %s", indexName, response)); 193 } 194 } 195 196 @Override 197 public void createIndex(String indexName, String jsonSettings) { 198 HttpEntity entity = new NStringEntity(jsonSettings, ContentType.APPLICATION_JSON); 199 Response response; 200 try { 201 response = lowLevelClient.performRequest("PUT", "/" + indexName, emptyMap(), entity); 202 } catch (IOException e) { 203 throw new NuxeoException(e); 204 } 205 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 206 throw new NuxeoException("Fail to create index: " + indexName + " :" + response); 207 } 208 } 209 210 @Override 211 public void createMapping(String indexName, String type, String jsonMapping) { 212 HttpEntity entity = new NStringEntity(jsonMapping, ContentType.APPLICATION_JSON); 213 Response response; 214 try { 215 response = lowLevelClient.performRequest("PUT", String.format("/%s/%s/_mapping", indexName, type), 216 emptyMap(), entity); 217 } catch (IOException e) { 218 throw new NuxeoException(e); 219 } 220 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 221 throw new NuxeoException(String.format("Fail to create mapping on %s/%s: %s", indexName, type, response)); 222 } 223 224 } 225 226 @Override 227 public String getNodesInfo() { 228 try { 229 Response response = lowLevelClient.performRequest("GET", "/_nodes/_all"); 230 return EntityUtils.toString(response.getEntity()); 231 } catch (IOException e) { 232 throw new NuxeoException(e); 233 } 234 } 235 236 @Override 237 public String getNodesStats() { 238 try { 239 Response response = lowLevelClient.performRequest("GET", "/_nodes/stats"); 240 return EntityUtils.toString(response.getEntity()); 241 } catch (IOException e) { 242 throw new NuxeoException(e); 243 } 244 } 245 246 @Override 247 public boolean aliasExists(String aliasName) { 248 Response response; 249 try { 250 response = lowLevelClient.performRequest("HEAD", String.format("/_alias/%s", aliasName)); 251 } catch (IOException e) { 252 throw new NuxeoException(e); 253 } 254 int code = response.getStatusLine().getStatusCode(); 255 if (code == HttpStatus.SC_OK) { 256 return true; 257 } else if (code == HttpStatus.SC_NOT_FOUND) { 258 return false; 259 } 260 throw new IllegalStateException(String.format("Checking alias %s returns: %s", aliasName, response)); 261 } 262 263 @Override 264 public String getFirstIndexForAlias(String aliasName) { 265 if (!aliasExists(aliasName)) { 266 return null; 267 } 268 try { 269 Response response = lowLevelClient.performRequest("GET", String.format("/_alias/%s", aliasName)); 270 try (InputStream is = response.getEntity().getContent()) { 271 Map<String, Object> map = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); 272 if (map.size() != 1) { 273 throw new NuxeoException(String.format( 274 "Expecting alias that point to a single index, alias: %s, got: %s", aliasName, response)); 275 } 276 return map.keySet().iterator().next(); 277 } 278 } catch (IOException e) { 279 throw new NuxeoException(e); 280 } 281 } 282 283 @Override 284 public void updateAlias(String aliasName, String indexName) { 285 // TODO do this in a single call to make it atomically 286 if (aliasExists(aliasName)) { 287 deleteAlias(aliasName); 288 } 289 if (indexExists(aliasName)) { 290 throw new NuxeoException("Can create an alias because an index with the same name exists: " + aliasName); 291 } 292 createAlias(aliasName, indexName); 293 } 294 295 protected void deleteAlias(String aliasName) { 296 Response response; 297 try { 298 response = lowLevelClient.performRequest("DELETE", String.format("/_all/_alias/%s", aliasName)); 299 } catch (IOException e) { 300 throw new NuxeoException(e); 301 } 302 int code = response.getStatusLine().getStatusCode(); 303 if (code != HttpStatus.SC_OK) { 304 throw new IllegalStateException(String.format("Deleting %s alias: %s", aliasName, response)); 305 } 306 } 307 308 protected void createAlias(String aliasName, String indexName) { 309 Response response; 310 try { 311 response = lowLevelClient.performRequest("PUT", String.format("/%s/_alias/%s", indexName, aliasName), 312 emptyMap()); 313 } catch (IOException e) { 314 throw new NuxeoException(e); 315 } 316 if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { 317 throw new NuxeoException("Fail to create alias: " + indexName + " :" + response); 318 } 319 } 320 321 @Override 322 public BulkResponse bulk(BulkRequest request) { 323 try { 324 return client.bulk(request); 325 } catch (IOException e) { 326 throw new NuxeoException(e); 327 } 328 } 329 330 @Override 331 public DeleteResponse delete(DeleteRequest request) { 332 try { 333 return client.delete(request); 334 } catch (IOException e) { 335 throw new NuxeoException(e); 336 } 337 } 338 339 @Override 340 public SearchResponse search(SearchRequest request) { 341 try { 342 return client.search(request); 343 } catch (IOException e) { 344 throw new NuxeoException(e); 345 } 346 } 347 348 @Override 349 public SearchResponse searchScroll(SearchScrollRequest request) { 350 try { 351 return client.searchScroll(request); 352 } catch (IOException e) { 353 throw new NuxeoException(e); 354 } 355 } 356 357 @Override 358 public GetResponse get(GetRequest request) { 359 try { 360 return client.get(request); 361 } catch (IOException e) { 362 throw new NuxeoException(e); 363 } 364 } 365 366 @Override 367 public IndexResponse index(IndexRequest request) { 368 try { 369 return client.index(request); 370 } catch (IOException e) { 371 throw new NuxeoException(e); 372 } 373 } 374 375 @Override 376 public ClearScrollResponse clearScroll(ClearScrollRequest request) { 377 try { 378 return client.clearScroll(request); 379 } catch (IOException e) { 380 throw new NuxeoException(e); 381 } 382 } 383 384 @Override 385 public void close() { 386 if (lowLevelClient != null) { 387 try { 388 lowLevelClient.close(); 389 } catch (IOException e) { 390 log.warn("Fail to close the Elasticsearch low level RestClient: " + e.getMessage(), e); 391 } 392 lowLevelClient = null; 393 } 394 client = null; 395 } 396}