001/* 002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS; 024 025import java.net.InetAddress; 026import java.net.UnknownHostException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.LinkedHashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.NoSuchElementException; 036import java.util.Set; 037import java.util.concurrent.atomic.AtomicInteger; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 042import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 043import org.elasticsearch.client.Client; 044import org.elasticsearch.client.transport.NoNodeAvailableException; 045import org.elasticsearch.client.transport.TransportClient; 046import org.elasticsearch.cluster.health.ClusterHealthStatus; 047import org.elasticsearch.common.settings.Settings; 048import org.elasticsearch.common.settings.Settings.Builder; 049import org.elasticsearch.common.transport.InetSocketTransportAddress; 050import org.elasticsearch.node.Node; 051import org.elasticsearch.node.NodeBuilder; 052import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 053import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 054import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig; 055import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig; 056import org.nuxeo.runtime.api.Framework; 057 058import com.google.common.util.concurrent.ListenableFuture; 059 060/** 061 * @since 6.0 062 */ 063public class ElasticSearchAdminImpl implements ElasticSearchAdmin { 064 private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class); 065 066 private static final String TIMEOUT_WAIT_FOR_CLUSTER = "30s"; 067 068 final AtomicInteger totalCommandProcessed = new AtomicInteger(0); 069 070 private final Map<String, String> indexNames = new HashMap<>(); 071 072 private final Map<String, String> repoNames = new HashMap<>(); 073 074 private final Map<String, ElasticSearchIndexConfig> indexConfig; 075 076 private Node localNode; 077 078 private Client client; 079 080 private boolean indexInitDone = false; 081 082 private final ElasticSearchLocalConfig localConfig; 083 084 private final ElasticSearchRemoteConfig remoteConfig; 085 086 private String[] includeSourceFields; 087 088 private String[] excludeSourceFields; 089 090 private boolean embedded = true; 091 092 private List<String> repositoryInitialized = new ArrayList<>(); 093 094 /** 095 * Init the admin service, remote configuration if not null will take precedence over local embedded configuration. 096 */ 097 public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig, 098 Map<String, ElasticSearchIndexConfig> indexConfig) { 099 this.remoteConfig = remoteConfig; 100 this.localConfig = localConfig; 101 this.indexConfig = indexConfig; 102 connect(); 103 initializeIndexes(); 104 } 105 106 private void connect() { 107 if (client != null) { 108 return; 109 } 110 if (remoteConfig != null) { 111 client = connectToRemote(remoteConfig); 112 embedded = false; 113 } else { 114 localNode = createEmbeddedNode(localConfig); 115 client = connectToEmbedded(); 116 embedded = true; 117 } 118 checkClusterHealth(); 119 log.info("ES Connected"); 120 } 121 122 public void disconnect() { 123 if (client != null) { 124 client.close(); 125 client = null; 126 indexInitDone = false; 127 log.info("ES Disconnected"); 128 } 129 if (localNode != null) { 130 localNode.close(); 131 localNode = null; 132 log.info("ES embedded Node Stopped"); 133 } 134 } 135 136 private Node createEmbeddedNode(ElasticSearchLocalConfig conf) { 137 log.info("ES embedded Node Initializing (local in JVM)"); 138 if (conf == null) { 139 throw new IllegalStateException("No embedded configuration defined"); 140 } 141 if (!Framework.isTestModeSet()) { 142 log.warn("Elasticsearch embedded configuration is ONLY for testing" 143 + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production."); 144 } 145 Builder sBuilder = Settings.settingsBuilder(); 146 sBuilder.put("http.enabled", conf.httpEnabled()) 147 .put("network.host", conf.getNetworkHost()) 148 .put("path.home", conf.getHomePath()) 149 .put("path.data", conf.getDataPath()) 150 .put("index.number_of_shards", 1) 151 .put("index.number_of_replicas", 0) 152 .put("cluster.name", conf.getClusterName()) 153 .put("node.name", conf.getNodeName()) 154 .put("http.netty.worker_count", 4) 155 .put("http.cors.enabled", true) 156 .put("cluster.routing.allocation.disk.threshold_enabled", false) 157 .put("http.port", conf.getHttpPort()); 158 if (conf.getIndexStorageType() != null) { 159 sBuilder.put("index.store.type", conf.getIndexStorageType()); 160 } 161 Settings settings = sBuilder.build(); 162 log.debug("Using settings: " + settings.toDelimitedString(',')); 163 Node ret = NodeBuilder.nodeBuilder().local(true).settings(settings).node(); 164 assert ret != null : "Can not create an embedded ES Node"; 165 return ret; 166 } 167 168 private Client connectToEmbedded() { 169 log.info("Connecting to embedded ES"); 170 Client ret = localNode.start().client(); 171 assert ret != null : "Can not connect to embedded ES Node"; 172 return ret; 173 } 174 175 private Client connectToRemote(ElasticSearchRemoteConfig config) { 176 log.info("Connecting to remote ES cluster: " + config); 177 Builder builder = Settings.settingsBuilder() 178 .put("cluster.name", config.getClusterName()) 179 .put("client.transport.nodes_sampler_interval", config.getSamplerInterval()) 180 .put("client.transport.ping_timeout", config.getPingTimeout()) 181 .put("client.transport.ignore_cluster_name", config.isIgnoreClusterName()) 182 .put("client.transport.sniff", config.isClusterSniff()); 183 Settings settings = builder.build(); 184 if (log.isDebugEnabled()) { 185 log.debug("Using settings: " + settings.toDelimitedString(',')); 186 } 187 TransportClient ret = TransportClient.builder().settings(settings).build(); 188 String[] addresses = config.getAddresses(); 189 if (addresses == null) { 190 log.error("You need to provide an addressList to join a cluster"); 191 } else { 192 for (String item : config.getAddresses()) { 193 String[] address = item.split(":"); 194 log.debug("Add transport address: " + item); 195 try { 196 InetAddress inet = InetAddress.getByName(address[0]); 197 ret.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1]))); 198 } catch (UnknownHostException e) { 199 log.error("Unable to resolve host " + address[0], e); 200 } 201 } 202 } 203 assert ret != null : "Unable to create a remote client"; 204 return ret; 205 } 206 207 private void checkClusterHealth(String... indexNames) { 208 if (client == null) { 209 throw new IllegalStateException("No es client available"); 210 } 211 String errorMessage = null; 212 try { 213 log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames)); 214 ClusterHealthResponse ret = client.admin() 215 .cluster() 216 .prepareHealth(indexNames) 217 .setTimeout(TIMEOUT_WAIT_FOR_CLUSTER) 218 .setWaitForYellowStatus() 219 .get(); 220 if (ret.isTimedOut()) { 221 errorMessage = "ES Cluster health status not Yellow after " + TIMEOUT_WAIT_FOR_CLUSTER + " give up: " 222 + ret; 223 } else { 224 if ((indexNames.length > 0) && ret.getStatus() != ClusterHealthStatus.GREEN) { 225 log.warn("Es Cluster ready but not GREEN: " + ret); 226 } else { 227 log.info("ES Cluster ready: " + ret); 228 } 229 } 230 } catch (NoNodeAvailableException e) { 231 errorMessage = "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage(); 232 } 233 if (errorMessage != null) { 234 log.error(errorMessage); 235 throw new RuntimeException(errorMessage); 236 } 237 } 238 239 private void initializeIndexes() { 240 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 241 if (conf.isDocumentIndex()) { 242 log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName()); 243 indexNames.put(conf.getRepositoryName(), conf.getName()); 244 repoNames.put(conf.getName(), conf.getRepositoryName()); 245 Set<String> set = new LinkedHashSet<>(); 246 if (includeSourceFields != null) { 247 set.addAll(Arrays.asList(includeSourceFields)); 248 } 249 set.addAll(Arrays.asList(conf.getIncludes())); 250 if (set.contains(ALL_FIELDS)) { 251 set.clear(); 252 set.add(ALL_FIELDS); 253 } 254 includeSourceFields = set.toArray(new String[set.size()]); 255 set.clear(); 256 if (excludeSourceFields != null) { 257 set.addAll(Arrays.asList(excludeSourceFields)); 258 } 259 set.addAll(Arrays.asList(conf.getExcludes())); 260 excludeSourceFields = set.toArray(new String[set.size()]); 261 } 262 263 } 264 initIndexes(false); 265 } 266 267 // Admin Impl ============================================================= 268 @Override 269 public void refreshRepositoryIndex(String repositoryName) { 270 if (log.isDebugEnabled()) { 271 log.debug("Refreshing index associated with repo: " + repositoryName); 272 } 273 getClient().admin().indices().prepareRefresh(getIndexNameForRepository(repositoryName)).execute().actionGet(); 274 if (log.isDebugEnabled()) { 275 log.debug("Refreshing index done"); 276 } 277 } 278 279 @Override 280 public String getIndexNameForRepository(String repositoryName) { 281 String ret = indexNames.get(repositoryName); 282 if (ret == null) { 283 throw new NoSuchElementException("No index defined for repository: " + repositoryName); 284 } 285 return ret; 286 } 287 288 @Override 289 public List<String> getIndexNamesForType(String type) { 290 List<String> indexNames = new ArrayList<>(); 291 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 292 if (type.equals(conf.getType())) { 293 indexNames.add(conf.getName()); 294 } 295 } 296 return indexNames; 297 } 298 299 @Override 300 public String getIndexNameForType(String type) { 301 List<String> indexNames = getIndexNamesForType(type); 302 if (indexNames.isEmpty()) { 303 throw new NoSuchElementException("No index defined for type: " + type); 304 } 305 return indexNames.get(0); 306 } 307 308 @Override 309 public void flushRepositoryIndex(String repositoryName) { 310 log.warn("Flushing index associated with repo: " + repositoryName); 311 getClient().admin().indices().prepareFlush(getIndexNameForRepository(repositoryName)).execute().actionGet(); 312 log.info("Flushing index done"); 313 } 314 315 @Override 316 public void refresh() { 317 for (String repositoryName : indexNames.keySet()) { 318 refreshRepositoryIndex(repositoryName); 319 } 320 } 321 322 @Override 323 public void flush() { 324 for (String repositoryName : indexNames.keySet()) { 325 flushRepositoryIndex(repositoryName); 326 } 327 } 328 329 @Override 330 public void optimizeIndex(String indexName) { 331 log.warn("Optimizing index: " + indexName); 332 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 333 if (conf.getName().equals(indexName)) { 334 getClient().admin().indices().prepareForceMerge(indexName).get(); 335 } 336 } 337 log.info("Optimize done"); 338 } 339 340 @Override 341 public void optimizeRepositoryIndex(String repositoryName) { 342 optimizeIndex(getIndexNameForRepository(repositoryName)); 343 } 344 345 @Override 346 public void optimize() { 347 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 348 optimizeIndex(conf.getName()); 349 } 350 } 351 352 @Override 353 public Client getClient() { 354 return client; 355 } 356 357 @Override 358 public void initIndexes(boolean dropIfExists) { 359 indexInitDone = false; 360 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 361 initIndex(conf, dropIfExists); 362 } 363 log.info("ES Service ready"); 364 indexInitDone = true; 365 } 366 367 @Override 368 public void dropAndInitIndex(String indexName) { 369 log.info("Drop and init index: " + indexName); 370 indexInitDone = false; 371 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 372 if (conf.getName().equals(indexName)) { 373 initIndex(conf, true); 374 } 375 } 376 indexInitDone = true; 377 } 378 379 @Override 380 public void dropAndInitRepositoryIndex(String repositoryName) { 381 log.info("Drop and init index of repository: " + repositoryName); 382 indexInitDone = false; 383 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 384 if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) { 385 initIndex(conf, true); 386 } 387 } 388 indexInitDone = true; 389 } 390 391 @Override 392 public List<String> getRepositoryNames() { 393 return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet())); 394 } 395 396 void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) { 397 if (!conf.mustCreate()) { 398 return; 399 } 400 log.info(String.format("Initialize index: %s, type: %s", conf.getName(), conf.getType())); 401 boolean mappingExists = false; 402 boolean indexExists = getClient().admin() 403 .indices() 404 .prepareExists(conf.getName()) 405 .execute() 406 .actionGet() 407 .isExists(); 408 if (indexExists) { 409 if (!dropIfExists) { 410 log.debug("Index " + conf.getName() + " already exists"); 411 mappingExists = getClient().admin() 412 .indices() 413 .prepareGetMappings(conf.getName()) 414 .execute() 415 .actionGet() 416 .getMappings() 417 .get(conf.getName()) 418 .containsKey(conf.getType()); 419 } else { 420 if (!Framework.isTestModeSet()) { 421 log.warn(String.format( 422 "Initializing index: %s, type: %s with " + "dropIfExists flag, deleting an existing index", 423 conf.getName(), conf.getType())); 424 } 425 getClient().admin().indices().delete(new DeleteIndexRequest(conf.getName())).actionGet(); 426 indexExists = false; 427 } 428 } 429 if (!indexExists) { 430 log.info(String.format("Creating index: %s", conf.getName())); 431 if (log.isDebugEnabled()) { 432 log.debug("Using settings: " + conf.getSettings()); 433 } 434 getClient().admin() 435 .indices() 436 .prepareCreate(conf.getName()) 437 .setSettings(conf.getSettings()) 438 .execute() 439 .actionGet(); 440 } 441 if (!mappingExists) { 442 log.info(String.format("Creating mapping type: %s on index: %s", conf.getType(), conf.getName())); 443 if (log.isDebugEnabled()) { 444 log.debug("Using mapping: " + conf.getMapping()); 445 } 446 getClient().admin() 447 .indices() 448 .preparePutMapping(conf.getName()) 449 .setType(conf.getType()) 450 .setSource(conf.getMapping()) 451 .execute() 452 .actionGet(); 453 if (!dropIfExists && conf.getRepositoryName() != null) { 454 repositoryInitialized.add(conf.getRepositoryName()); 455 } 456 } 457 // make sure the index is ready before returning 458 checkClusterHealth(conf.getName()); 459 } 460 461 @Override 462 public long getPendingWorkerCount() { 463 // impl of scheduling is left to the ESService 464 throw new UnsupportedOperationException("Not implemented"); 465 } 466 467 @Override 468 public long getRunningWorkerCount() { 469 // impl of scheduling is left to the ESService 470 throw new UnsupportedOperationException("Not implemented"); 471 } 472 473 @Override 474 public int getTotalCommandProcessed() { 475 return totalCommandProcessed.get(); 476 } 477 478 @Override 479 public boolean isEmbedded() { 480 return embedded; 481 } 482 483 @Override 484 public boolean useExternalVersion() { 485 if (isEmbedded()) { 486 return localConfig.useExternalVersion(); 487 } 488 return remoteConfig.useExternalVersion(); 489 } 490 491 @Override 492 public boolean isIndexingInProgress() { 493 // impl of scheduling is left to the ESService 494 throw new UnsupportedOperationException("Not implemented"); 495 } 496 497 @Override 498 public ListenableFuture<Boolean> prepareWaitForIndexing() { 499 throw new UnsupportedOperationException("Not implemented"); 500 } 501 502 /** 503 * Get the elastic search indexes for searches 504 */ 505 String[] getSearchIndexes(List<String> searchRepositories) { 506 if (searchRepositories.isEmpty()) { 507 Collection<String> values = indexNames.values(); 508 return values.toArray(new String[values.size()]); 509 } 510 String[] ret = new String[searchRepositories.size()]; 511 int i = 0; 512 for (String repo : searchRepositories) { 513 ret[i++] = getIndexNameForRepository(repo); 514 } 515 return ret; 516 } 517 518 public boolean isReady() { 519 return indexInitDone; 520 } 521 522 String[] getIncludeSourceFields() { 523 return includeSourceFields; 524 } 525 526 String[] getExcludeSourceFields() { 527 return excludeSourceFields; 528 } 529 530 Map<String, String> getRepositoryMap() { 531 return repoNames; 532 } 533 534 /** 535 * Get the list of repository names that have their index created. 536 */ 537 public List<String> getInitializedRepositories() { 538 return repositoryInitialized; 539 } 540}