001/* 002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS; 024 025import java.net.InetAddress; 026import java.net.UnknownHostException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.LinkedHashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.NoSuchElementException; 036import java.util.Set; 037import java.util.concurrent.atomic.AtomicInteger; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 042import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 043import org.elasticsearch.client.Client; 044import org.elasticsearch.client.transport.NoNodeAvailableException; 045import org.elasticsearch.client.transport.TransportClient; 046import org.elasticsearch.cluster.health.ClusterHealthStatus; 047import org.elasticsearch.common.settings.Settings; 048import org.elasticsearch.common.settings.Settings.Builder; 049import org.elasticsearch.common.transport.InetSocketTransportAddress; 050import org.elasticsearch.node.Node; 051import org.elasticsearch.node.NodeBuilder; 052import org.nuxeo.elasticsearch.api.ESClientInitializationService; 053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 054import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 055import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig; 056import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig; 057import org.nuxeo.runtime.api.Framework; 058 059import com.google.common.util.concurrent.ListenableFuture; 060 061/** 062 * @since 6.0 063 */ 064public class ElasticSearchAdminImpl implements ElasticSearchAdmin { 065 private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class); 066 067 private static final String TIMEOUT_WAIT_FOR_CLUSTER = "30s"; 068 069 final AtomicInteger totalCommandProcessed = new AtomicInteger(0); 070 071 private final Map<String, String> indexNames = new HashMap<>(); 072 073 private final Map<String, String> repoNames = new HashMap<>(); 074 075 private final Map<String, ElasticSearchIndexConfig> indexConfig; 076 077 private Node localNode; 078 079 private Client client; 080 081 private boolean indexInitDone = false; 082 083 private final ElasticSearchLocalConfig localConfig; 084 085 private final ElasticSearchRemoteConfig remoteConfig; 086 087 private final ESClientInitializationService clientInitService; 088 089 private String[] includeSourceFields; 090 091 private String[] excludeSourceFields; 092 093 private boolean embedded = true; 094 095 private List<String> repositoryInitialized = new ArrayList<>(); 096 097 /** 098 * Init the admin service, remote configuration if not null will take precedence over local embedded configuration. 099 * 100 * @deprecated since 9.1, use {@link #ElasticSearchAdminImpl(ElasticSearchLocalConfig, ElasticSearchRemoteConfig, 101 * Map<String, ElasticSearchIndexConfig>, ESClientInitializationService)} instead. 102 */ 103 @Deprecated 104 public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig, 105 Map<String, ElasticSearchIndexConfig> indexConfig) { 106 this(localConfig, remoteConfig, indexConfig, null); 107 } 108 109 /** 110 * Init the admin service, remote configuration if not null will take precedence over local embedded configuration. 111 * The transport client initialization can be customized. 112 * 113 * @since 9.1 114 */ 115 public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig, 116 Map<String, ElasticSearchIndexConfig> indexConfig, ESClientInitializationService clientInitService) { 117 this.remoteConfig = remoteConfig; 118 this.localConfig = localConfig; 119 this.indexConfig = indexConfig; 120 this.clientInitService = clientInitService; 121 connect(); 122 initializeIndexes(); 123 } 124 125 private void connect() { 126 if (client != null) { 127 return; 128 } 129 if (remoteConfig != null) { 130 client = connectToRemote(remoteConfig); 131 embedded = false; 132 } else { 133 localNode = createEmbeddedNode(localConfig); 134 client = connectToEmbedded(); 135 embedded = true; 136 } 137 checkClusterHealth(); 138 log.info("ES Connected"); 139 } 140 141 public void disconnect() { 142 if (client != null) { 143 client.close(); 144 client = null; 145 indexInitDone = false; 146 log.info("ES Disconnected"); 147 } 148 if (localNode != null) { 149 localNode.close(); 150 localNode = null; 151 log.info("ES embedded Node Stopped"); 152 } 153 } 154 155 private Node createEmbeddedNode(ElasticSearchLocalConfig conf) { 156 log.info("ES embedded Node Initializing (local in JVM)"); 157 if (conf == null) { 158 throw new IllegalStateException("No embedded configuration defined"); 159 } 160 if (!Framework.isTestModeSet()) { 161 log.warn("Elasticsearch embedded configuration is ONLY for testing" 162 + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production."); 163 } 164 Builder sBuilder = Settings.settingsBuilder(); 165 sBuilder.put("http.enabled", conf.httpEnabled()) 166 .put("network.host", conf.getNetworkHost()) 167 .put("path.home", conf.getHomePath()) 168 .put("path.data", conf.getDataPath()) 169 .put("index.number_of_shards", 1) 170 .put("index.number_of_replicas", 0) 171 .put("cluster.name", conf.getClusterName()) 172 .put("node.name", conf.getNodeName()) 173 .put("http.netty.worker_count", 4) 174 .put("http.cors.enabled", true) 175 .put("http.cors.allow-origin", "*") 176 .put("http.cors.allow-credentials", true) 177 .put("http.cors.allow-headers", "Authorization, X-Requested-With, Content-Type, Content-Length") 178 .put("cluster.routing.allocation.disk.threshold_enabled", false) 179 .put("http.port", conf.getHttpPort()); 180 if (conf.getIndexStorageType() != null) { 181 sBuilder.put("index.store.type", conf.getIndexStorageType()); 182 } 183 Settings settings = sBuilder.build(); 184 log.debug("Using settings: " + settings.toDelimitedString(',')); 185 Node ret = NodeBuilder.nodeBuilder().local(true).settings(settings).node(); 186 assert ret != null : "Can not create an embedded ES Node"; 187 return ret; 188 } 189 190 private Client connectToEmbedded() { 191 log.info("Connecting to embedded ES"); 192 Client ret = localNode.start().client(); 193 assert ret != null : "Can not connect to embedded ES Node"; 194 return ret; 195 } 196 197 private Client connectToRemote(ElasticSearchRemoteConfig config) { 198 log.info("Connecting to remote ES cluster: " + config); 199 200 Settings settings = clientInitService.initializeSettings(config); 201 if (log.isDebugEnabled()) { 202 log.debug("Using settings: " + settings.toDelimitedString(',')); 203 } 204 205 TransportClient ret = clientInitService.initializeClient(settings); 206 207 String[] addresses = config.getAddresses(); 208 if (addresses == null) { 209 log.error("You need to provide an addressList to join a cluster"); 210 } else { 211 for (String item : config.getAddresses()) { 212 String[] address = item.split(":"); 213 log.debug("Add transport address: " + item); 214 try { 215 InetAddress inet = InetAddress.getByName(address[0]); 216 ret.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1]))); 217 } catch (UnknownHostException e) { 218 log.error("Unable to resolve host " + address[0], e); 219 } 220 } 221 } 222 assert ret != null : "Unable to create a remote client"; 223 return ret; 224 } 225 226 private void checkClusterHealth(String... indexNames) { 227 if (client == null) { 228 throw new IllegalStateException("No es client available"); 229 } 230 String errorMessage = null; 231 try { 232 log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames)); 233 ClusterHealthResponse ret = client.admin() 234 .cluster() 235 .prepareHealth(indexNames) 236 .setTimeout(TIMEOUT_WAIT_FOR_CLUSTER) 237 .setWaitForYellowStatus() 238 .get(); 239 if (ret.isTimedOut()) { 240 errorMessage = "ES Cluster health status not Yellow after " + TIMEOUT_WAIT_FOR_CLUSTER + " give up: " 241 + ret; 242 } else { 243 if ((indexNames.length > 0) && ret.getStatus() != ClusterHealthStatus.GREEN) { 244 log.warn("Es Cluster ready but not GREEN: " + ret); 245 } else { 246 log.info("ES Cluster ready: " + ret); 247 } 248 } 249 } catch (NoNodeAvailableException e) { 250 errorMessage = "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage(); 251 } 252 if (errorMessage != null) { 253 log.error(errorMessage); 254 throw new RuntimeException(errorMessage); 255 } 256 } 257 258 private void initializeIndexes() { 259 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 260 if (conf.isDocumentIndex()) { 261 log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName()); 262 indexNames.put(conf.getRepositoryName(), conf.getName()); 263 repoNames.put(conf.getName(), conf.getRepositoryName()); 264 Set<String> set = new LinkedHashSet<>(); 265 if (includeSourceFields != null) { 266 set.addAll(Arrays.asList(includeSourceFields)); 267 } 268 set.addAll(Arrays.asList(conf.getIncludes())); 269 if (set.contains(ALL_FIELDS)) { 270 set.clear(); 271 set.add(ALL_FIELDS); 272 } 273 includeSourceFields = set.toArray(new String[set.size()]); 274 set.clear(); 275 if (excludeSourceFields != null) { 276 set.addAll(Arrays.asList(excludeSourceFields)); 277 } 278 set.addAll(Arrays.asList(conf.getExcludes())); 279 excludeSourceFields = set.toArray(new String[set.size()]); 280 } 281 282 } 283 initIndexes(false); 284 } 285 286 // Admin Impl ============================================================= 287 @Override 288 public void refreshRepositoryIndex(String repositoryName) { 289 if (log.isDebugEnabled()) { 290 log.debug("Refreshing index associated with repo: " + repositoryName); 291 } 292 getClient().admin().indices().prepareRefresh(getIndexNameForRepository(repositoryName)).execute().actionGet(); 293 if (log.isDebugEnabled()) { 294 log.debug("Refreshing index done"); 295 } 296 } 297 298 @Override 299 public String getIndexNameForRepository(String repositoryName) { 300 String ret = indexNames.get(repositoryName); 301 if (ret == null) { 302 throw new NoSuchElementException("No index defined for repository: " + repositoryName); 303 } 304 return ret; 305 } 306 307 @Override 308 public List<String> getIndexNamesForType(String type) { 309 List<String> indexNames = new ArrayList<>(); 310 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 311 if (type.equals(conf.getType())) { 312 indexNames.add(conf.getName()); 313 } 314 } 315 return indexNames; 316 } 317 318 @Override 319 public String getIndexNameForType(String type) { 320 List<String> indexNames = getIndexNamesForType(type); 321 if (indexNames.isEmpty()) { 322 throw new NoSuchElementException("No index defined for type: " + type); 323 } 324 return indexNames.get(0); 325 } 326 327 @Override 328 public void flushRepositoryIndex(String repositoryName) { 329 log.warn("Flushing index associated with repo: " + repositoryName); 330 getClient().admin().indices().prepareFlush(getIndexNameForRepository(repositoryName)).execute().actionGet(); 331 log.info("Flushing index done"); 332 } 333 334 @Override 335 public void refresh() { 336 for (String repositoryName : indexNames.keySet()) { 337 refreshRepositoryIndex(repositoryName); 338 } 339 } 340 341 @Override 342 public void flush() { 343 for (String repositoryName : indexNames.keySet()) { 344 flushRepositoryIndex(repositoryName); 345 } 346 } 347 348 @Override 349 public void optimizeIndex(String indexName) { 350 log.warn("Optimizing index: " + indexName); 351 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 352 if (conf.getName().equals(indexName)) { 353 getClient().admin().indices().prepareForceMerge(indexName).get(); 354 } 355 } 356 log.info("Optimize done"); 357 } 358 359 @Override 360 public void optimizeRepositoryIndex(String repositoryName) { 361 optimizeIndex(getIndexNameForRepository(repositoryName)); 362 } 363 364 @Override 365 public void optimize() { 366 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 367 optimizeIndex(conf.getName()); 368 } 369 } 370 371 @Override 372 public Client getClient() { 373 return client; 374 } 375 376 @Override 377 public void initIndexes(boolean dropIfExists) { 378 indexInitDone = false; 379 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 380 initIndex(conf, dropIfExists); 381 } 382 log.info("ES Service ready"); 383 indexInitDone = true; 384 } 385 386 @Override 387 public void dropAndInitIndex(String indexName) { 388 log.info("Drop and init index: " + indexName); 389 indexInitDone = false; 390 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 391 if (conf.getName().equals(indexName)) { 392 initIndex(conf, true); 393 } 394 } 395 indexInitDone = true; 396 } 397 398 @Override 399 public void dropAndInitRepositoryIndex(String repositoryName) { 400 log.info("Drop and init index of repository: " + repositoryName); 401 indexInitDone = false; 402 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 403 if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) { 404 initIndex(conf, true); 405 } 406 } 407 indexInitDone = true; 408 } 409 410 @Override 411 public List<String> getRepositoryNames() { 412 return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet())); 413 } 414 415 void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) { 416 if (!conf.mustCreate()) { 417 return; 418 } 419 log.info(String.format("Initialize index: %s, type: %s", conf.getName(), conf.getType())); 420 boolean mappingExists = false; 421 boolean indexExists = getClient().admin() 422 .indices() 423 .prepareExists(conf.getName()) 424 .execute() 425 .actionGet() 426 .isExists(); 427 if (indexExists) { 428 if (!dropIfExists) { 429 log.debug("Index " + conf.getName() + " already exists"); 430 mappingExists = getClient().admin() 431 .indices() 432 .prepareGetMappings(conf.getName()) 433 .execute() 434 .actionGet() 435 .getMappings() 436 .get(conf.getName()) 437 .containsKey(conf.getType()); 438 } else { 439 if (!Framework.isTestModeSet()) { 440 log.warn(String.format( 441 "Initializing index: %s, type: %s with " + "dropIfExists flag, deleting an existing index", 442 conf.getName(), conf.getType())); 443 } 444 getClient().admin().indices().delete(new DeleteIndexRequest(conf.getName())).actionGet(); 445 indexExists = false; 446 } 447 } 448 if (!indexExists) { 449 log.info(String.format("Creating index: %s", conf.getName())); 450 if (log.isDebugEnabled()) { 451 log.debug("Using settings: " + conf.getSettings()); 452 } 453 getClient().admin() 454 .indices() 455 .prepareCreate(conf.getName()) 456 .setSettings(conf.getSettings()) 457 .execute() 458 .actionGet(); 459 } 460 if (!mappingExists) { 461 log.info(String.format("Creating mapping type: %s on index: %s", conf.getType(), conf.getName())); 462 if (log.isDebugEnabled()) { 463 log.debug("Using mapping: " + conf.getMapping()); 464 } 465 getClient().admin() 466 .indices() 467 .preparePutMapping(conf.getName()) 468 .setType(conf.getType()) 469 .setSource(conf.getMapping()) 470 .execute() 471 .actionGet(); 472 if (!dropIfExists && conf.getRepositoryName() != null) { 473 repositoryInitialized.add(conf.getRepositoryName()); 474 } 475 } 476 // make sure the index is ready before returning 477 checkClusterHealth(conf.getName()); 478 } 479 480 @Override 481 public long getPendingWorkerCount() { 482 // impl of scheduling is left to the ESService 483 throw new UnsupportedOperationException("Not implemented"); 484 } 485 486 @Override 487 public long getRunningWorkerCount() { 488 // impl of scheduling is left to the ESService 489 throw new UnsupportedOperationException("Not implemented"); 490 } 491 492 @Override 493 public int getTotalCommandProcessed() { 494 return totalCommandProcessed.get(); 495 } 496 497 @Override 498 public boolean isEmbedded() { 499 return embedded; 500 } 501 502 @Override 503 public boolean useExternalVersion() { 504 if (isEmbedded()) { 505 return localConfig.useExternalVersion(); 506 } 507 return remoteConfig.useExternalVersion(); 508 } 509 510 @Override 511 public boolean isIndexingInProgress() { 512 // impl of scheduling is left to the ESService 513 throw new UnsupportedOperationException("Not implemented"); 514 } 515 516 @Override 517 public ListenableFuture<Boolean> prepareWaitForIndexing() { 518 throw new UnsupportedOperationException("Not implemented"); 519 } 520 521 /** 522 * Get the elastic search indexes for searches 523 */ 524 String[] getSearchIndexes(List<String> searchRepositories) { 525 if (searchRepositories.isEmpty()) { 526 Collection<String> values = indexNames.values(); 527 return values.toArray(new String[values.size()]); 528 } 529 String[] ret = new String[searchRepositories.size()]; 530 int i = 0; 531 for (String repo : searchRepositories) { 532 ret[i++] = getIndexNameForRepository(repo); 533 } 534 return ret; 535 } 536 537 public boolean isReady() { 538 return indexInitDone; 539 } 540 541 String[] getIncludeSourceFields() { 542 return includeSourceFields; 543 } 544 545 String[] getExcludeSourceFields() { 546 return excludeSourceFields; 547 } 548 549 Map<String, String> getRepositoryMap() { 550 return repoNames; 551 } 552 553 /** 554 * Get the list of repository names that have their index created. 555 */ 556 public List<String> getInitializedRepositories() { 557 return repositoryInitialized; 558 } 559}