001/* 002 * (C) Copyright 2014-2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.LinkedHashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.NoSuchElementException; 035import java.util.Set; 036import java.util.concurrent.atomic.AtomicInteger; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.elasticsearch.api.ESClient; 042import org.nuxeo.elasticsearch.api.ESClientFactory; 043import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 044import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig; 045import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig; 046import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 047import org.nuxeo.runtime.api.Framework; 048 049import com.google.common.util.concurrent.ListenableFuture; 050 051/** 052 * @since 6.0 053 */ 054public class ElasticSearchAdminImpl implements ElasticSearchAdmin { 055 private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class); 056 057 protected static final int TIMEOUT_WAIT_FOR_CLUSTER_SECOND = 30; 058 059 protected static final int TIMEOUT_DELETE_SECOND = 300; 060 061 protected final AtomicInteger totalCommandProcessed = new AtomicInteger(0); 062 063 protected final Map<String, String> indexNames = new HashMap<>(); 064 065 protected final Map<String, String> repoNames = new HashMap<>(); 066 067 protected final Map<String, String> writeIndexNames = new HashMap<>(); 068 069 protected final Map<String, ElasticSearchIndexConfig> indexConfig; 070 071 protected final ElasticSearchEmbeddedServerConfig embeddedServerConfig; 072 073 protected final ElasticSearchClientConfig clientConfig; 074 075 protected ElasticSearchEmbeddedNode embeddedServer; 076 077 protected ESClient client; 078 079 protected boolean indexInitDone; 080 081 protected String[] includeSourceFields; 082 083 protected String[] excludeSourceFields; 084 085 protected List<String> repositoryInitialized = new ArrayList<>(); 086 087 /** 088 * Init the admin service, remote configuration if not null will take precedence over local embedded configuration. 089 * The transport client initialization can be customized. 090 * 091 * @since 9.1 092 */ 093 public ElasticSearchAdminImpl(ElasticSearchEmbeddedServerConfig embeddedServerConfig, 094 ElasticSearchClientConfig clientConfig, Map<String, ElasticSearchIndexConfig> indexConfig) { 095 this.embeddedServerConfig = embeddedServerConfig; 096 this.indexConfig = indexConfig; 097 this.clientConfig = clientConfig; 098 checkConfig(); 099 connect(); 100 initializeIndexes(); 101 } 102 103 protected void checkConfig() { 104 if (clientConfig == null) { 105 throw new IllegalStateException("No Elasticsearch Client configuration provided, aborting"); 106 } 107 } 108 109 protected void connect() { 110 if (client != null) { 111 return; 112 } 113 if (embeddedServerConfig != null) { 114 embeddedServer = new ElasticSearchEmbeddedNode(embeddedServerConfig); 115 embeddedServer.start(); 116 } 117 client = createClient(embeddedServer); 118 checkClusterHealth(); 119 log.info("Elasticsearch Connected"); 120 } 121 122 public void disconnect() { 123 if (client != null) { 124 try { 125 client.close(); 126 } catch (Exception e) { 127 log.error("Failed to close client: " + e.getMessage(), e); 128 } 129 client = null; 130 indexInitDone = false; 131 log.info("Elasticsearch Disconnected"); 132 } 133 if (embeddedServer != null) { 134 try { 135 embeddedServer.close(); 136 } catch (IOException e) { 137 log.error("Failed to close embedded node: " + e.getMessage(), e); 138 } 139 embeddedServer = null; 140 log.info("Elasticsearch embedded Node Stopped"); 141 } 142 } 143 144 protected ESClient createClient(ElasticSearchEmbeddedNode node) { 145 log.info("Connecting to Elasticsearch"); 146 ESClient ret; 147 try { 148 ESClientFactory clientFactory = clientConfig.getKlass().newInstance(); 149 ret = clientFactory.create(node, clientConfig); 150 } catch (ReflectiveOperationException e) { 151 log.error("Cannot instantiate Elasticsearch Client from class: " + clientConfig.getKlass()); 152 throw new NuxeoException(e); 153 } 154 return ret; 155 } 156 157 protected void checkClusterHealth(String... indexNames) { 158 if (client == null) { 159 throw new IllegalStateException("No Elasticsearch Client available"); 160 } 161 client.waitForYellowStatus(indexNames, TIMEOUT_WAIT_FOR_CLUSTER_SECOND); 162 } 163 164 protected void initializeIndexes() { 165 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 166 if (conf.isDocumentIndex()) { 167 log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName()); 168 indexNames.put(conf.getRepositoryName(), conf.getName()); 169 repoNames.put(conf.getName(), conf.getRepositoryName()); 170 Set<String> set = new LinkedHashSet<>(); 171 if (includeSourceFields != null) { 172 set.addAll(Arrays.asList(includeSourceFields)); 173 } 174 set.addAll(Arrays.asList(conf.getIncludes())); 175 if (set.contains(ALL_FIELDS)) { 176 set.clear(); 177 set.add(ALL_FIELDS); 178 } 179 includeSourceFields = set.toArray(new String[set.size()]); 180 set.clear(); 181 if (excludeSourceFields != null) { 182 set.addAll(Arrays.asList(excludeSourceFields)); 183 } 184 set.addAll(Arrays.asList(conf.getExcludes())); 185 excludeSourceFields = set.toArray(new String[set.size()]); 186 } 187 188 } 189 initIndexes(false); 190 } 191 192 // Admin Impl ============================================================= 193 @Override 194 public void refreshRepositoryIndex(String repositoryName) { 195 if (log.isDebugEnabled()) { 196 log.debug("Refreshing index associated with repo: " + repositoryName); 197 } 198 getClient().refresh(getWriteIndexName(getIndexNameForRepository(repositoryName))); 199 if (log.isDebugEnabled()) { 200 log.debug("Refreshing index done"); 201 } 202 } 203 204 @Override 205 public String getIndexNameForRepository(String repositoryName) { 206 String ret = indexNames.get(repositoryName); 207 if (ret == null) { 208 throw new NoSuchElementException("No index defined for repository: " + repositoryName); 209 } 210 return ret; 211 } 212 213 @Override 214 public String getRepositoryForIndex(String indexName) { 215 return repoNames.get(indexName); 216 } 217 218 @Override 219 public List<String> getIndexNamesForType(String type) { 220 List<String> indexNames = new ArrayList<>(); 221 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 222 if (type.equals(conf.getType())) { 223 indexNames.add(conf.getName()); 224 } 225 } 226 return indexNames; 227 } 228 229 @Override 230 public String getIndexNameForType(String type) { 231 List<String> indexNames = getIndexNamesForType(type); 232 if (indexNames.isEmpty()) { 233 throw new NoSuchElementException("No index defined for type: " + type); 234 } 235 return indexNames.get(0); 236 } 237 238 @Override 239 public String getWriteIndexName(String searchIndexName) { 240 return writeIndexNames.getOrDefault(searchIndexName, searchIndexName); 241 } 242 243 @Override 244 public void syncSearchAndWriteAlias(String searchIndexName) { 245 ElasticSearchIndexConfig conf = indexConfig.values() 246 .stream() 247 .filter(item -> item.getName().equals(searchIndexName)) 248 .findFirst() 249 .orElseThrow(IllegalStateException::new); 250 syncSearchAndWriteAlias(conf); 251 } 252 253 @Override 254 public void flushRepositoryIndex(String repositoryName) { 255 log.warn("Flushing index associated with repo: " + repositoryName); 256 getClient().flush(getWriteIndexName(getIndexNameForRepository(repositoryName))); 257 log.info("Flushing index done"); 258 } 259 260 @Override 261 public void refresh() { 262 for (String repositoryName : indexNames.keySet()) { 263 refreshRepositoryIndex(repositoryName); 264 } 265 } 266 267 @Override 268 public void flush() { 269 for (String repositoryName : indexNames.keySet()) { 270 flushRepositoryIndex(repositoryName); 271 } 272 } 273 274 @Override 275 public void optimizeIndex(String indexName) { 276 log.warn("Optimizing index: " + indexName); 277 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 278 if (conf.getName().equals(indexName)) { 279 getClient().optimize(indexName); 280 } 281 } 282 log.info("Optimize done"); 283 } 284 285 @Override 286 public void optimizeRepositoryIndex(String repositoryName) { 287 optimizeIndex(getIndexNameForRepository(repositoryName)); 288 } 289 290 @Override 291 public void optimize() { 292 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 293 optimizeIndex(conf.getName()); 294 } 295 } 296 297 @Override 298 public ESClient getClient() { 299 return client; 300 } 301 302 @Override 303 public void initIndexes(boolean dropIfExists) { 304 indexInitDone = false; 305 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 306 initIndex(conf, dropIfExists); 307 } 308 log.info("Elasticsearch Service ready"); 309 indexInitDone = true; 310 } 311 312 @Override 313 public void dropAndInitIndex(String indexName) { 314 log.info("Drop and init index: " + indexName); 315 indexInitDone = false; 316 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 317 if (conf.getName().equals(indexName)) { 318 initIndex(conf, true); 319 } 320 } 321 indexInitDone = true; 322 } 323 324 @Override 325 public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) { 326 log.info("Drop and init index of repository: " + repositoryName); 327 indexInitDone = false; 328 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 329 if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) { 330 initIndex(conf, true, syncAlias); 331 } 332 } 333 indexInitDone = true; 334 } 335 336 @Override 337 public List<String> getRepositoryNames() { 338 return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet())); 339 } 340 341 protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) { 342 initIndex(conf, dropIfExists, true); 343 } 344 345 protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists, boolean syncAlias) { 346 if (conf.manageAlias()) { 347 initWriteAlias(conf, dropIfExists); 348 initSearchAlias(conf); 349 writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias()); 350 if (syncAlias) { 351 syncSearchAndWriteAlias(conf); 352 } 353 } else if (conf.hasExplicitWriteIndex()) { 354 initIndex(conf.writeIndexOrAlias(), conf, dropIfExists); 355 writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias()); 356 } else { 357 initIndex(conf.getName(), conf, dropIfExists); 358 writeIndexNames.put(conf.getName(), conf.getName()); 359 } 360 } 361 362 protected void initWriteAlias(ElasticSearchIndexConfig conf, boolean dropIfExists) { 363 // init the write index and alias 364 String writeAlias = conf.writeIndexOrAlias(); 365 String writeIndex = getClient().getFirstIndexForAlias(writeAlias); 366 String nextWriteIndex = conf.newWriteIndexForAlias(conf.getName(), writeIndex); 367 if (writeIndex != null && !dropIfExists) { 368 // alias exists make sure the index is well configured 369 initIndex(writeIndex, conf, false); 370 } else { 371 // create a new write index and update the alias, we don't drop anything 372 if (getClient().indexExists(nextWriteIndex)) { 373 throw new IllegalStateException( 374 String.format("New index name %s for the alias %s already exists", nextWriteIndex, writeAlias)); 375 } 376 initIndex(nextWriteIndex, conf, false); 377 getClient().updateAlias(writeAlias, nextWriteIndex); 378 } 379 } 380 381 protected void initSearchAlias(ElasticSearchIndexConfig conf) { 382 // init the search alias 383 String searchAlias = conf.getName(); 384 String searchIndex = getClient().getFirstIndexForAlias(searchAlias); 385 String writeAlias = conf.writeIndexOrAlias(); 386 String writeIndex = getClient().getFirstIndexForAlias(writeAlias); 387 if (searchIndex == null) { 388 if (getClient().indexExists(searchAlias)) { 389 if (Framework.isTestModeSet()) { 390 // in test mode we drop an index that have the target alias name 391 getClient().deleteIndex(searchAlias, TIMEOUT_DELETE_SECOND); 392 } 393 searchIndex = searchAlias; 394 } else { 395 // search alias is not created, point to the write index 396 getClient().updateAlias(searchAlias, writeIndex); 397 searchIndex = writeIndex; 398 } 399 } 400 log.info(String.format("Managed index aliases: Alias: %s -> index: %s, alias: %s -> index: %s", searchAlias, 401 searchIndex, writeAlias, writeIndex)); 402 } 403 404 /** 405 * Update the search index to point to the write index. 406 */ 407 protected void syncSearchAndWriteAlias(ElasticSearchIndexConfig conf) { 408 if (!conf.manageAlias()) { 409 return; 410 } 411 String searchAlias = conf.getName(); 412 String searchIndex = getClient().getFirstIndexForAlias(searchAlias); 413 String writeAlias = conf.writeIndexOrAlias(); 414 String writeIndex = getClient().getFirstIndexForAlias(writeAlias); 415 if (!writeIndex.equals(searchIndex)) { 416 log.warn(String.format("Updating search alias %s->%s (previously %s)", searchAlias, writeIndex, 417 searchIndex)); 418 getClient().updateAlias(searchAlias, writeIndex); 419 searchIndex = writeIndex; 420 } 421 if (searchIndex != null) { 422 repoNames.put(searchIndex, conf.getRepositoryName()); 423 } 424 } 425 426 protected void initIndex(String indexName, ElasticSearchIndexConfig conf, boolean dropIfExists) { 427 if (!conf.mustCreate()) { 428 return; 429 } 430 log.info(String.format("Initialize index: %s with conf: %s, type: %s", indexName, conf.getName(), 431 conf.getType())); 432 boolean mappingExists = false; 433 boolean indexExists = getClient().indexExists(indexName); 434 if (indexExists) { 435 if (!dropIfExists) { 436 log.debug("Index " + indexName + " already exists"); 437 mappingExists = getClient().mappingExists(indexName, conf.getType()); 438 if (conf.isDocumentIndex()) { 439 // Check if the index is actually an alias. 440 String realIndexForAlias = getClient().getFirstIndexForAlias(conf.getName()); 441 if (realIndexForAlias != null) { 442 repoNames.put(realIndexForAlias, conf.getRepositoryName()); 443 } 444 } 445 } else { 446 if (!Framework.isTestModeSet()) { 447 log.warn(String.format( 448 "Initializing index: %s, type: %s with " + "dropIfExists flag, deleting an existing index", 449 indexName, conf.getType())); 450 } 451 getClient().deleteIndex(indexName, TIMEOUT_DELETE_SECOND); 452 indexExists = false; 453 } 454 } 455 if (!indexExists) { 456 log.info(String.format("Creating index: %s", indexName)); 457 if (log.isDebugEnabled()) { 458 log.debug("Using settings: " + conf.getSettings()); 459 } 460 getClient().createIndex(indexName, conf.getSettings()); 461 } 462 if (!mappingExists) { 463 log.info(String.format("Creating mapping type: %s on index: %s", indexName, conf.getName())); 464 if (log.isDebugEnabled()) { 465 log.debug("Using mapping: " + conf.getMapping()); 466 } 467 getClient().createMapping(indexName, conf.getType(), conf.getMapping()); 468 if (!dropIfExists && conf.getRepositoryName() != null) { 469 repositoryInitialized.add(conf.getRepositoryName()); 470 } 471 } 472 // make sure the index is ready before returning 473 checkClusterHealth(indexName); 474 } 475 476 @Override 477 public long getPendingWorkerCount() { 478 // impl of scheduling is left to the ESService 479 throw new UnsupportedOperationException("Not implemented"); 480 } 481 482 @Override 483 public long getRunningWorkerCount() { 484 // impl of scheduling is left to the ESService 485 throw new UnsupportedOperationException("Not implemented"); 486 } 487 488 @Override 489 public int getTotalCommandProcessed() { 490 return totalCommandProcessed.get(); 491 } 492 493 @Override 494 public boolean isEmbedded() { 495 return embeddedServer != null; 496 } 497 498 @Override 499 public boolean useExternalVersion() { 500 return clientConfig.useExternalVersion(); 501 } 502 503 @Override 504 public boolean isIndexingInProgress() { 505 // impl of scheduling is left to the ESService 506 throw new UnsupportedOperationException("Not implemented"); 507 } 508 509 @Override 510 public ListenableFuture<Boolean> prepareWaitForIndexing() { 511 throw new UnsupportedOperationException("Not implemented"); 512 } 513 514 /** 515 * Get the elastic search indexes for searches 516 */ 517 protected String[] getSearchIndexes(List<String> searchRepositories) { 518 if (searchRepositories.isEmpty()) { 519 Collection<String> values = indexNames.values(); 520 return values.toArray(new String[values.size()]); 521 } 522 String[] ret = new String[searchRepositories.size()]; 523 int i = 0; 524 for (String repo : searchRepositories) { 525 ret[i++] = getIndexNameForRepository(repo); 526 } 527 return ret; 528 } 529 530 public boolean isReady() { 531 return indexInitDone; 532 } 533 534 protected String[] getIncludeSourceFields() { 535 return includeSourceFields; 536 } 537 538 protected String[] getExcludeSourceFields() { 539 return excludeSourceFields; 540 } 541 542 protected Map<String, String> getRepositoryMap() { 543 return repoNames; 544 } 545 546 /** 547 * Get the list of repository names that have their index created. 548 */ 549 public List<String> getInitializedRepositories() { 550 return repositoryInitialized; 551 } 552}