001/* 002 * (C) Copyright 2014-2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collection; 029import java.util.HashMap; 030import java.util.LinkedHashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.NoSuchElementException; 034import java.util.Optional; 035import java.util.Set; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.stream.Collectors; 038 039import org.apache.logging.log4j.LogManager; 040import org.apache.logging.log4j.Logger; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.elasticsearch.api.ESClient; 043import org.nuxeo.elasticsearch.api.ESClientFactory; 044import org.nuxeo.elasticsearch.api.ESHintQueryBuilder; 045import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 046import org.nuxeo.elasticsearch.config.ESHintQueryBuilderDescriptor; 047import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig; 048import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig; 049import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 050import org.nuxeo.runtime.api.Framework; 051 052import com.google.common.util.concurrent.ListenableFuture; 053 054/** 055 * @since 6.0 056 */ 057public class ElasticSearchAdminImpl implements ElasticSearchAdmin { 058 private static final Logger log = LogManager.getLogger(ElasticSearchAdminImpl.class); 059 060 protected static final int TIMEOUT_WAIT_FOR_CLUSTER_SECOND = 30; 061 062 protected static final int TIMEOUT_DELETE_SECOND = 300; 063 064 protected final AtomicInteger totalCommandProcessed = new AtomicInteger(0); 065 066 protected final Map<String, String> indexNames = new HashMap<>(); 067 068 protected final Map<String, String> repoNames = new HashMap<>(); 069 070 protected final Map<String, String> writeIndexNames = new HashMap<>(); 071 072 protected final Map<String, ElasticSearchIndexConfig> indexConfig; 073 074 protected Map<String, ESHintQueryBuilder> hints; 075 076 protected final ElasticSearchEmbeddedServerConfig embeddedServerConfig; 077 078 protected final ElasticSearchClientConfig clientConfig; 079 080 protected ElasticSearchEmbeddedNode embeddedServer; 081 082 protected ESClient client; 083 084 protected boolean indexInitDone; 085 086 protected String[] includeSourceFields; 087 088 protected String[] excludeSourceFields; 089 090 protected List<String> repositoryInitialized = new ArrayList<>(); 091 092 /** 093 * Init the admin service, remote configuration if not null will take precedence over local embedded configuration. 094 * The transport client initialization can be customized. 095 * 096 * @since 9.1 097 */ 098 public ElasticSearchAdminImpl(ElasticSearchEmbeddedServerConfig embeddedServerConfig, 099 ElasticSearchClientConfig clientConfig, Map<String, ElasticSearchIndexConfig> indexConfig, 100 Collection<ESHintQueryBuilderDescriptor> hintDescriptors) { 101 this.embeddedServerConfig = embeddedServerConfig; 102 this.indexConfig = indexConfig; 103 this.clientConfig = clientConfig; 104 this.hints = hintDescriptors.stream() 105 .collect(Collectors.toMap(ESHintQueryBuilderDescriptor::getName, 106 ESHintQueryBuilderDescriptor::newInstance)); 107 checkConfig(); 108 connect(); 109 initializeIndexes(); 110 } 111 112 protected void checkConfig() { 113 if (clientConfig == null) { 114 throw new IllegalStateException("No Elasticsearch Client configuration provided, aborting"); 115 } 116 } 117 118 protected void connect() { 119 if (client != null) { 120 return; 121 } 122 if (embeddedServerConfig != null) { 123 embeddedServer = new ElasticSearchEmbeddedNode(embeddedServerConfig); 124 embeddedServer.start(); 125 } 126 client = createClient(embeddedServer); 127 try { 128 checkClusterHealth(); 129 log.info("Elasticsearch Connected"); 130 } catch (Exception e) { 131 disconnect(); 132 throw new IllegalStateException("Unable to check cluster health", e); 133 } 134 } 135 136 public void disconnect() { 137 if (client != null) { 138 try { 139 client.close(); 140 } catch (Exception e) { 141 log.error("Failed to close client: " + e.getMessage(), e); 142 } 143 client = null; 144 indexInitDone = false; 145 log.info("Elasticsearch Disconnected"); 146 } 147 if (embeddedServer != null) { 148 try { 149 embeddedServer.close(); 150 } catch (IOException e) { 151 log.error("Failed to close embedded node: {}", e.getMessage(), e); 152 } 153 embeddedServer = null; 154 log.info("Elasticsearch embedded Node Stopped"); 155 } 156 } 157 158 protected ESClient createClient(ElasticSearchEmbeddedNode node) { 159 log.info("Connecting to Elasticsearch"); 160 ESClient ret; 161 try { 162 ESClientFactory clientFactory = clientConfig.getKlass().getDeclaredConstructor().newInstance(); 163 ret = clientFactory.create(node, clientConfig); 164 } catch (ReflectiveOperationException e) { 165 log.error("Cannot instantiate Elasticsearch Client from class: {}", clientConfig::getKlass); 166 throw new NuxeoException(e); 167 } 168 return ret; 169 } 170 171 protected void checkClusterHealth(String... indexNames) { 172 if (client == null) { 173 throw new IllegalStateException("No Elasticsearch Client available"); 174 } 175 client.waitForYellowStatus(indexNames, TIMEOUT_WAIT_FOR_CLUSTER_SECOND); 176 } 177 178 protected void initializeIndexes() { 179 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 180 if (conf.isDocumentIndex()) { 181 log.info("Associate index: {} with repository: {}", conf::getName, conf::getRepositoryName); 182 indexNames.put(conf.getRepositoryName(), conf.getName()); 183 repoNames.put(conf.getName(), conf.getRepositoryName()); 184 Set<String> set = new LinkedHashSet<>(); 185 if (includeSourceFields != null) { 186 set.addAll(Arrays.asList(includeSourceFields)); 187 } 188 set.addAll(Arrays.asList(conf.getIncludes())); 189 if (set.contains(ALL_FIELDS)) { 190 set.clear(); 191 set.add(ALL_FIELDS); 192 } 193 includeSourceFields = set.toArray(new String[0]); 194 set.clear(); 195 if (excludeSourceFields != null) { 196 set.addAll(Arrays.asList(excludeSourceFields)); 197 } 198 set.addAll(Arrays.asList(conf.getExcludes())); 199 excludeSourceFields = set.toArray(new String[0]); 200 } 201 202 } 203 initIndexes(false); 204 } 205 206 // Admin Impl ============================================================= 207 @Override 208 public void refreshRepositoryIndex(String repositoryName) { 209 log.debug("Refreshing index associated with repo: {}", repositoryName); 210 getClient().refresh(getWriteIndexName(getIndexNameForRepository(repositoryName))); 211 log.debug("Refreshing index done"); 212 } 213 214 @Override 215 public String getIndexNameForRepository(String repositoryName) { 216 String ret = indexNames.get(repositoryName); 217 if (ret == null) { 218 throw new NoSuchElementException("No index defined for repository: " + repositoryName); 219 } 220 return ret; 221 } 222 223 @Override 224 public String getRepositoryForIndex(String indexName) { 225 return repoNames.get(indexName); 226 } 227 228 @Override 229 public List<String> getIndexNamesForType(String type) { 230 List<String> indexNames = new ArrayList<>(); 231 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 232 if (type.equals(conf.getType())) { 233 indexNames.add(conf.getName()); 234 } 235 } 236 return indexNames; 237 } 238 239 @Override 240 public String getIndexNameForType(String type) { 241 List<String> indexNames = getIndexNamesForType(type); 242 if (indexNames.isEmpty()) { 243 throw new NoSuchElementException("No index defined for type: " + type); 244 } 245 return indexNames.get(0); 246 } 247 248 @Override 249 public String getWriteIndexName(String searchIndexName) { 250 return writeIndexNames.getOrDefault(searchIndexName, searchIndexName); 251 } 252 253 @Override 254 public void syncSearchAndWriteAlias(String searchIndexName) { 255 ElasticSearchIndexConfig conf = indexConfig.values() 256 .stream() 257 .filter(item -> item.getName().equals(searchIndexName)) 258 .findFirst() 259 .orElseThrow(IllegalStateException::new); 260 syncSearchAndWriteAlias(conf); 261 } 262 263 @Override 264 public void flushRepositoryIndex(String repositoryName) { 265 log.warn("Flushing index associated with repo: {}", repositoryName); 266 getClient().flush(getWriteIndexName(getIndexNameForRepository(repositoryName))); 267 log.info("Flushing index done"); 268 } 269 270 @Override 271 public void refresh() { 272 for (String repositoryName : indexNames.keySet()) { 273 refreshRepositoryIndex(repositoryName); 274 } 275 } 276 277 @Override 278 public void flush() { 279 for (String repositoryName : indexNames.keySet()) { 280 flushRepositoryIndex(repositoryName); 281 } 282 } 283 284 @Override 285 public void optimizeIndex(String indexName) { 286 log.warn("Optimizing index: {}", indexName); 287 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 288 if (conf.getName().equals(indexName)) { 289 getClient().optimize(indexName); 290 } 291 } 292 log.info("Optimize done"); 293 } 294 295 @Override 296 public void optimizeRepositoryIndex(String repositoryName) { 297 optimizeIndex(getIndexNameForRepository(repositoryName)); 298 } 299 300 @Override 301 public void optimize() { 302 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 303 optimizeIndex(conf.getName()); 304 } 305 } 306 307 @Override 308 public ESClient getClient() { 309 return client; 310 } 311 312 @Override 313 public void initIndexes(boolean dropIfExists) { 314 indexInitDone = false; 315 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 316 initIndex(conf, dropIfExists); 317 } 318 log.info("Elasticsearch Service ready"); 319 indexInitDone = true; 320 } 321 322 @Override 323 public void dropAndInitIndex(String indexName) { 324 log.info("Drop and init index: {}", indexName); 325 indexInitDone = false; 326 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 327 if (conf.getName().equals(indexName)) { 328 initIndex(conf, true); 329 } 330 } 331 indexInitDone = true; 332 } 333 334 @Override 335 public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) { 336 log.info("Drop and init index of repository: {}", repositoryName); 337 indexInitDone = false; 338 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 339 if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) { 340 initIndex(conf, true, syncAlias); 341 } 342 } 343 indexInitDone = true; 344 } 345 346 @Override 347 public List<String> getRepositoryNames() { 348 return List.copyOf(indexNames.keySet()); 349 } 350 351 protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) { 352 initIndex(conf, dropIfExists, true); 353 } 354 355 protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists, boolean syncAlias) { 356 if (conf.manageAlias()) { 357 initWriteAlias(conf, dropIfExists); 358 initSearchAlias(conf); 359 writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias()); 360 if (syncAlias) { 361 syncSearchAndWriteAlias(conf); 362 } 363 } else if (conf.hasExplicitWriteIndex()) { 364 initIndex(conf.writeIndexOrAlias(), conf, dropIfExists); 365 writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias()); 366 } else { 367 initIndex(conf.getName(), conf, dropIfExists); 368 writeIndexNames.put(conf.getName(), conf.getName()); 369 } 370 } 371 372 protected void initWriteAlias(ElasticSearchIndexConfig conf, boolean dropIfExists) { 373 // init the write index and alias 374 String writeAlias = conf.writeIndexOrAlias(); 375 String writeIndex = getClient().getFirstIndexForAlias(writeAlias); 376 String nextWriteIndex = conf.newWriteIndexForAlias(conf.getName(), writeIndex); 377 if (writeIndex != null && !dropIfExists) { 378 // alias exists make sure the index is well configured 379 initIndex(writeIndex, conf, false); 380 } else { 381 // create a new write index and update the alias, we don't drop anything 382 if (getClient().indexExists(nextWriteIndex)) { 383 throw new IllegalStateException( 384 String.format("New index name %s for the alias %s already exists", nextWriteIndex, writeAlias)); 385 } 386 initIndex(nextWriteIndex, conf, false); 387 getClient().updateAlias(writeAlias, nextWriteIndex); 388 } 389 } 390 391 protected void initSearchAlias(ElasticSearchIndexConfig conf) { 392 // init the search alias 393 String searchAlias = conf.getName(); 394 String searchIndex = getClient().getFirstIndexForAlias(searchAlias); 395 String writeAlias = conf.writeIndexOrAlias(); 396 String writeIndex = getClient().getFirstIndexForAlias(writeAlias); 397 if (searchIndex == null) { 398 if (getClient().indexExists(searchAlias)) { 399 if (Framework.isTestModeSet()) { 400 // in test mode we drop an index that have the target alias name 401 getClient().deleteIndex(searchAlias, TIMEOUT_DELETE_SECOND); 402 } 403 searchIndex = searchAlias; 404 } else { 405 // search alias is not created, point to the write index 406 getClient().updateAlias(searchAlias, writeIndex); 407 searchIndex = writeIndex; 408 } 409 } 410 log.info("Managed index aliases: Alias: {} -> index: {}, alias: {} -> index: {}", searchAlias, searchIndex, 411 writeAlias, writeIndex); 412 } 413 414 /** 415 * Update the search index to point to the write index. 416 */ 417 protected void syncSearchAndWriteAlias(ElasticSearchIndexConfig conf) { 418 if (!conf.manageAlias()) { 419 return; 420 } 421 String searchAlias = conf.getName(); 422 String searchIndex = getClient().getFirstIndexForAlias(searchAlias); 423 String writeAlias = conf.writeIndexOrAlias(); 424 String writeIndex = getClient().getFirstIndexForAlias(writeAlias); 425 if (!writeIndex.equals(searchIndex)) { 426 log.warn("Updating search alias {} -> {} (previously {})", searchAlias, writeIndex, searchIndex); 427 getClient().updateAlias(searchAlias, writeIndex); 428 searchIndex = writeIndex; 429 } 430 repoNames.put(searchIndex, conf.getRepositoryName()); 431 } 432 433 protected void initIndex(String indexName, ElasticSearchIndexConfig conf, boolean dropIfExists) { 434 if (!conf.mustCreate()) { 435 return; 436 } 437 log.info("Initialize index: {} with conf: {}, type: {}", indexName, conf.getName(), conf.getType()); 438 boolean mappingExists = false; 439 boolean indexExists = getClient().indexExists(indexName); 440 if (indexExists) { 441 if (!dropIfExists) { 442 log.debug("Index: {} already exists", indexName); 443 mappingExists = getClient().mappingExists(indexName, conf.getType()); 444 if (conf.isDocumentIndex()) { 445 // Check if the index is actually an alias. 446 String realIndexForAlias = getClient().getFirstIndexForAlias(conf.getName()); 447 if (realIndexForAlias != null) { 448 repoNames.put(realIndexForAlias, conf.getRepositoryName()); 449 } 450 } 451 } else { 452 if (!Framework.isTestModeSet()) { 453 log.warn("Initializing index: {}, type: {} with dropIfExists flag, deleting an existing index", 454 indexName, conf.getType()); 455 } 456 getClient().deleteIndex(indexName, TIMEOUT_DELETE_SECOND); 457 indexExists = false; 458 } 459 } 460 if (!indexExists) { 461 log.info("Creating index: {}", indexName); 462 log.debug("Using settings: {}", conf::getSettings); 463 getClient().createIndex(indexName, conf.getSettings()); 464 } 465 if (!mappingExists) { 466 log.info("Creating mapping type: {} on index: {}", indexName, conf.getName()); 467 log.debug("Using mapping: {}", conf::getMapping); 468 getClient().createMapping(indexName, conf.getType(), conf.getMapping()); 469 if (!dropIfExists && conf.getRepositoryName() != null) { 470 repositoryInitialized.add(conf.getRepositoryName()); 471 } 472 } 473 // make sure the index is ready before returning 474 checkClusterHealth(indexName); 475 } 476 477 @Override 478 public long getPendingWorkerCount() { 479 // impl of scheduling is left to the ESService 480 throw new UnsupportedOperationException("Not implemented"); 481 } 482 483 @Override 484 public long getRunningWorkerCount() { 485 // impl of scheduling is left to the ESService 486 throw new UnsupportedOperationException("Not implemented"); 487 } 488 489 @Override 490 public int getTotalCommandProcessed() { 491 return totalCommandProcessed.get(); 492 } 493 494 @Override 495 public boolean isEmbedded() { 496 return embeddedServer != null; 497 } 498 499 @Override 500 public boolean useExternalVersion() { 501 return clientConfig.useExternalVersion(); 502 } 503 504 @Override 505 public boolean isIndexingInProgress() { 506 // impl of scheduling is left to the ESService 507 throw new UnsupportedOperationException("Not implemented"); 508 } 509 510 @Override 511 public ListenableFuture<Boolean> prepareWaitForIndexing() { 512 throw new UnsupportedOperationException("Not implemented"); 513 } 514 515 /** 516 * Get the elastic search indexes for searches 517 */ 518 protected String[] getSearchIndexes(List<String> searchRepositories) { 519 if (searchRepositories.isEmpty()) { 520 Collection<String> values = indexNames.values(); 521 return values.toArray(new String[0]); 522 } 523 String[] ret = new String[searchRepositories.size()]; 524 int i = 0; 525 for (String repo : searchRepositories) { 526 ret[i++] = getIndexNameForRepository(repo); 527 } 528 return ret; 529 } 530 531 public boolean isReady() { 532 return indexInitDone; 533 } 534 535 protected String[] getIncludeSourceFields() { 536 return includeSourceFields; 537 } 538 539 protected String[] getExcludeSourceFields() { 540 return excludeSourceFields; 541 } 542 543 protected Map<String, String> getRepositoryMap() { 544 return repoNames; 545 } 546 547 /** 548 * Get the list of repository names that have their index created. 549 */ 550 public List<String> getInitializedRepositories() { 551 return repositoryInitialized; 552 } 553 554 @Override 555 public Optional<ESHintQueryBuilder> getHintByOperator(String name) { 556 return Optional.ofNullable(hints.get(name)); 557 } 558}