001/* 002 * (C) Copyright 2014-2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.LinkedHashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.NoSuchElementException; 035import java.util.Set; 036import java.util.concurrent.atomic.AtomicInteger; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.elasticsearch.api.ESClient; 042import org.nuxeo.elasticsearch.api.ESClientFactory; 043import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 044import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig; 045import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig; 046import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 047import org.nuxeo.runtime.api.Framework; 048 049import com.google.common.util.concurrent.ListenableFuture; 050 051/** 052 * @since 6.0 053 */ 054public class ElasticSearchAdminImpl implements ElasticSearchAdmin { 055 private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class); 056 057 protected static final int TIMEOUT_WAIT_FOR_CLUSTER_SECOND = 30; 058 059 protected static final int TIMEOUT_DELETE_SECOND = 300; 060 061 protected final AtomicInteger totalCommandProcessed = new AtomicInteger(0); 062 063 protected final Map<String, String> indexNames = new HashMap<>(); 064 065 protected final Map<String, String> repoNames = new HashMap<>(); 066 067 protected final Map<String, String> writeIndexNames = new HashMap<>(); 068 069 protected final Map<String, ElasticSearchIndexConfig> indexConfig; 070 071 protected final ElasticSearchEmbeddedServerConfig embeddedServerConfig; 072 073 protected final ElasticSearchClientConfig clientConfig; 074 075 protected ElasticSearchEmbeddedNode embeddedServer; 076 077 protected ESClient client; 078 079 protected boolean indexInitDone; 080 081 protected String[] includeSourceFields; 082 083 protected String[] excludeSourceFields; 084 085 protected List<String> repositoryInitialized = new ArrayList<>(); 086 087 /** 088 * Init the admin service, remote configuration if not null will take precedence over local embedded configuration. 089 * The transport client initialization can be customized. 090 * 091 * @since 9.1 092 */ 093 public ElasticSearchAdminImpl(ElasticSearchEmbeddedServerConfig embeddedServerConfig, 094 ElasticSearchClientConfig clientConfig, Map<String, ElasticSearchIndexConfig> indexConfig) { 095 this.embeddedServerConfig = embeddedServerConfig; 096 this.indexConfig = indexConfig; 097 this.clientConfig = clientConfig; 098 checkConfig(); 099 connect(); 100 initializeIndexes(); 101 } 102 103 protected void checkConfig() { 104 if (clientConfig == null) { 105 throw new IllegalStateException("No Elasticsearch Client configuration provided, aborting"); 106 } 107 } 108 109 protected void connect() { 110 if (client != null) { 111 return; 112 } 113 if (embeddedServerConfig != null) { 114 embeddedServer = new ElasticSearchEmbeddedNode(embeddedServerConfig); 115 embeddedServer.start(); 116 } 117 client = createClient(embeddedServer); 118 checkClusterHealth(); 119 log.info("Elasticsearch Connected"); 120 } 121 122 public void disconnect() { 123 if (client != null) { 124 try { 125 client.close(); 126 } catch (Exception e) { 127 log.error("Failed to close client: " + e.getMessage(), e); 128 } 129 client = null; 130 indexInitDone = false; 131 log.info("Elasticsearch Disconnected"); 132 } 133 if (embeddedServer != null) { 134 try { 135 embeddedServer.close(); 136 } catch (IOException e) { 137 log.error("Failed to close embedded node: " + e.getMessage(), e); 138 } 139 embeddedServer = null; 140 log.info("Elasticsearch embedded Node Stopped"); 141 } 142 } 143 144 protected ESClient createClient(ElasticSearchEmbeddedNode node) { 145 log.info("Connecting to Elasticsearch"); 146 ESClient ret; 147 try { 148 ESClientFactory clientFactory = clientConfig.getKlass().newInstance(); 149 ret = clientFactory.create(node, clientConfig); 150 } catch (ReflectiveOperationException e) { 151 log.error("Cannot instantiate Elasticsearch Client from class: " + clientConfig.getKlass()); 152 throw new NuxeoException(e); 153 } 154 return ret; 155 } 156 157 protected void checkClusterHealth(String... indexNames) { 158 if (client == null) { 159 throw new IllegalStateException("No Elasticsearch Client available"); 160 } 161 client.waitForYellowStatus(indexNames, TIMEOUT_WAIT_FOR_CLUSTER_SECOND); 162 } 163 164 protected void initializeIndexes() { 165 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 166 if (conf.isDocumentIndex()) { 167 log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName()); 168 indexNames.put(conf.getRepositoryName(), conf.getName()); 169 repoNames.put(conf.getName(), conf.getRepositoryName()); 170 Set<String> set = new LinkedHashSet<>(); 171 if (includeSourceFields != null) { 172 set.addAll(Arrays.asList(includeSourceFields)); 173 } 174 set.addAll(Arrays.asList(conf.getIncludes())); 175 if (set.contains(ALL_FIELDS)) { 176 set.clear(); 177 set.add(ALL_FIELDS); 178 } 179 includeSourceFields = set.toArray(new String[set.size()]); 180 set.clear(); 181 if (excludeSourceFields != null) { 182 set.addAll(Arrays.asList(excludeSourceFields)); 183 } 184 set.addAll(Arrays.asList(conf.getExcludes())); 185 excludeSourceFields = set.toArray(new String[set.size()]); 186 } 187 188 } 189 initIndexes(false); 190 } 191 192 // Admin Impl ============================================================= 193 @Override 194 public void refreshRepositoryIndex(String repositoryName) { 195 if (log.isDebugEnabled()) { 196 log.debug("Refreshing index associated with repo: " + repositoryName); 197 } 198 getClient().refresh(getWriteIndexName(getIndexNameForRepository(repositoryName))); 199 if (log.isDebugEnabled()) { 200 log.debug("Refreshing index done"); 201 } 202 } 203 204 @Override 205 public String getIndexNameForRepository(String repositoryName) { 206 String ret = indexNames.get(repositoryName); 207 if (ret == null) { 208 throw new NoSuchElementException("No index defined for repository: " + repositoryName); 209 } 210 return ret; 211 } 212 213 @Override 214 public String getRepositoryForIndex(String indexName) { 215 return repoNames.get(indexName); 216 } 217 218 @Override 219 public List<String> getIndexNamesForType(String type) { 220 List<String> indexNames = new ArrayList<>(); 221 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 222 if (type.equals(conf.getType())) { 223 indexNames.add(conf.getName()); 224 } 225 } 226 return indexNames; 227 } 228 229 @Override 230 public String getIndexNameForType(String type) { 231 List<String> indexNames = getIndexNamesForType(type); 232 if (indexNames.isEmpty()) { 233 throw new NoSuchElementException("No index defined for type: " + type); 234 } 235 return indexNames.get(0); 236 } 237 238 @Override 239 public String getWriteIndexName(String searchIndexName) { 240 return writeIndexNames.getOrDefault(searchIndexName, searchIndexName); 241 } 242 243 @Override 244 public void syncSearchAndWriteAlias(String searchIndexName) { 245 ElasticSearchIndexConfig conf = indexConfig.values() 246 .stream() 247 .filter(item -> item.getName().equals(searchIndexName)) 248 .findFirst() 249 .orElseThrow(IllegalStateException::new); 250 syncSearchAndWriteAlias(conf); 251 } 252 253 @Override 254 public void flushRepositoryIndex(String repositoryName) { 255 log.warn("Flushing index associated with repo: " + repositoryName); 256 getClient().flush(getWriteIndexName(getIndexNameForRepository(repositoryName))); 257 log.info("Flushing index done"); 258 } 259 260 @Override 261 public void refresh() { 262 for (String repositoryName : indexNames.keySet()) { 263 refreshRepositoryIndex(repositoryName); 264 } 265 } 266 267 @Override 268 public void flush() { 269 for (String repositoryName : indexNames.keySet()) { 270 flushRepositoryIndex(repositoryName); 271 } 272 } 273 274 @Override 275 public void optimizeIndex(String indexName) { 276 log.warn("Optimizing index: " + indexName); 277 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 278 if (conf.getName().equals(indexName)) { 279 getClient().optimize(indexName); 280 } 281 } 282 log.info("Optimize done"); 283 } 284 285 @Override 286 public void optimizeRepositoryIndex(String repositoryName) { 287 optimizeIndex(getIndexNameForRepository(repositoryName)); 288 } 289 290 @Override 291 public void optimize() { 292 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 293 optimizeIndex(conf.getName()); 294 } 295 } 296 297 @Override 298 public ESClient getClient() { 299 return client; 300 } 301 302 @Override 303 public void initIndexes(boolean dropIfExists) { 304 indexInitDone = false; 305 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 306 initIndex(conf, dropIfExists); 307 } 308 log.info("Elasticsearch Service ready"); 309 indexInitDone = true; 310 } 311 312 @Override 313 public void dropAndInitIndex(String indexName) { 314 log.info("Drop and init index: " + indexName); 315 indexInitDone = false; 316 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 317 if (conf.getName().equals(indexName)) { 318 initIndex(conf, true); 319 } 320 } 321 indexInitDone = true; 322 } 323 324 @Override 325 public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) { 326 log.info("Drop and init index of repository: " + repositoryName); 327 indexInitDone = false; 328 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 329 if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) { 330 initIndex(conf, true, syncAlias); 331 } 332 } 333 indexInitDone = true; 334 } 335 336 @Override 337 public List<String> getRepositoryNames() { 338 return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet())); 339 } 340 341 protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) { 342 initIndex(conf, dropIfExists, true); 343 } 344 345 protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists, boolean syncAlias) { 346 if (conf.manageAlias()) { 347 initWriteAlias(conf, dropIfExists); 348 initSearchAlias(conf); 349 writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias()); 350 if (syncAlias) { 351 syncSearchAndWriteAlias(conf); 352 } 353 } else if (conf.hasExplicitWriteIndex()) { 354 initIndex(conf.writeIndexOrAlias(), conf, dropIfExists); 355 writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias()); 356 } else { 357 initIndex(conf.getName(), conf, dropIfExists); 358 writeIndexNames.put(conf.getName(), conf.getName()); 359 } 360 } 361 362 protected void initWriteAlias(ElasticSearchIndexConfig conf, boolean dropIfExists) { 363 // init the write index and alias 364 String writeAlias = conf.writeIndexOrAlias(); 365 String writeIndex = getClient().getFirstIndexForAlias(writeAlias); 366 String nextWriteIndex = conf.newWriteIndexForAlias(conf.getName(), writeIndex); 367 if (writeIndex != null && !dropIfExists) { 368 // alias exists make sure the index is well configured 369 initIndex(writeIndex, conf, false); 370 } else { 371 // create a new write index and update the alias, we don't drop anything 372 if (getClient().indexExists(nextWriteIndex)) { 373 throw new IllegalStateException( 374 String.format("New index name %s for the alias %s already exists", nextWriteIndex, writeAlias)); 375 } 376 initIndex(nextWriteIndex, conf, false); 377 getClient().updateAlias(writeAlias, nextWriteIndex); 378 } 379 } 380 381 protected void initSearchAlias(ElasticSearchIndexConfig conf) { 382 // init the search alias 383 String searchAlias = conf.getName(); 384 String searchIndex = getClient().getFirstIndexForAlias(searchAlias); 385 String writeAlias = conf.writeIndexOrAlias(); 386 String writeIndex = getClient().getFirstIndexForAlias(writeAlias); 387 if (searchIndex == null) { 388 if (getClient().indexExists(searchAlias)) { 389 if (Framework.isTestModeSet()) { 390 // in test mode we drop an index that have the target alias name 391 getClient().deleteIndex(searchAlias, TIMEOUT_DELETE_SECOND); 392 } 393 searchIndex = searchAlias; 394 } else { 395 // search alias is not created, point to the write index 396 getClient().updateAlias(searchAlias, writeIndex); 397 searchIndex = writeIndex; 398 } 399 } 400 log.info(String.format("Managed index aliases: Alias: %s -> index: %s, alias: %s -> index: %s", searchAlias, searchIndex, writeAlias, writeIndex)); 401 } 402 403 /** 404 * Update the search index to point to the write index. 405 */ 406 protected void syncSearchAndWriteAlias(ElasticSearchIndexConfig conf) { 407 if (!conf.manageAlias()) { 408 return; 409 } 410 String searchAlias = conf.getName(); 411 String searchIndex = getClient().getFirstIndexForAlias(searchAlias); 412 String writeAlias = conf.writeIndexOrAlias(); 413 String writeIndex = getClient().getFirstIndexForAlias(writeAlias); 414 if (!writeIndex.equals(searchIndex)) { 415 log.warn(String.format("Updating search alias %s->%s (previously %s)", searchAlias, writeIndex, searchIndex)); 416 getClient().updateAlias(searchAlias, writeIndex); 417 searchIndex = writeIndex; 418 } 419 if (searchIndex != null) { 420 repoNames.put(searchIndex, conf.getRepositoryName()); 421 } 422 } 423 424 protected void initIndex(String indexName, ElasticSearchIndexConfig conf, boolean dropIfExists) { 425 if (!conf.mustCreate()) { 426 return; 427 } 428 log.info(String.format("Initialize index: %s with conf: %s, type: %s", indexName, conf.getName(), 429 conf.getType())); 430 boolean mappingExists = false; 431 boolean indexExists = getClient().indexExists(indexName); 432 if (indexExists) { 433 if (!dropIfExists) { 434 log.debug("Index " + indexName + " already exists"); 435 mappingExists = getClient().mappingExists(indexName, conf.getType()); 436 if (conf.isDocumentIndex()) { 437 //Check if the index is actually an alias. 438 String realIndexForAlias = getClient().getFirstIndexForAlias(conf.getName()); 439 if (realIndexForAlias != null) { 440 repoNames.put(realIndexForAlias, conf.getRepositoryName()); 441 } 442 } 443 } else { 444 if (!Framework.isTestModeSet()) { 445 log.warn(String.format( 446 "Initializing index: %s, type: %s with " + "dropIfExists flag, deleting an existing index", 447 indexName, conf.getType())); 448 } 449 getClient().deleteIndex(indexName, TIMEOUT_DELETE_SECOND); 450 indexExists = false; 451 } 452 } 453 if (!indexExists) { 454 log.info(String.format("Creating index: %s", indexName)); 455 if (log.isDebugEnabled()) { 456 log.debug("Using settings: " + conf.getSettings()); 457 } 458 getClient().createIndex(indexName, conf.getSettings()); 459 } 460 if (!mappingExists) { 461 log.info(String.format("Creating mapping type: %s on index: %s", indexName, conf.getName())); 462 if (log.isDebugEnabled()) { 463 log.debug("Using mapping: " + conf.getMapping()); 464 } 465 getClient().createMapping(indexName, conf.getType(), conf.getMapping()); 466 if (!dropIfExists && conf.getRepositoryName() != null) { 467 repositoryInitialized.add(conf.getRepositoryName()); 468 } 469 } 470 // make sure the index is ready before returning 471 checkClusterHealth(indexName); 472 } 473 474 @Override 475 public long getPendingWorkerCount() { 476 // impl of scheduling is left to the ESService 477 throw new UnsupportedOperationException("Not implemented"); 478 } 479 480 @Override 481 public long getRunningWorkerCount() { 482 // impl of scheduling is left to the ESService 483 throw new UnsupportedOperationException("Not implemented"); 484 } 485 486 @Override 487 public int getTotalCommandProcessed() { 488 return totalCommandProcessed.get(); 489 } 490 491 @Override 492 public boolean isEmbedded() { 493 return embeddedServer != null; 494 } 495 496 @Override 497 public boolean useExternalVersion() { 498 return clientConfig.useExternalVersion(); 499 } 500 501 @Override 502 public boolean isIndexingInProgress() { 503 // impl of scheduling is left to the ESService 504 throw new UnsupportedOperationException("Not implemented"); 505 } 506 507 @Override 508 public ListenableFuture<Boolean> prepareWaitForIndexing() { 509 throw new UnsupportedOperationException("Not implemented"); 510 } 511 512 /** 513 * Get the elastic search indexes for searches 514 */ 515 protected String[] getSearchIndexes(List<String> searchRepositories) { 516 if (searchRepositories.isEmpty()) { 517 Collection<String> values = indexNames.values(); 518 return values.toArray(new String[values.size()]); 519 } 520 String[] ret = new String[searchRepositories.size()]; 521 int i = 0; 522 for (String repo : searchRepositories) { 523 ret[i++] = getIndexNameForRepository(repo); 524 } 525 return ret; 526 } 527 528 public boolean isReady() { 529 return indexInitDone; 530 } 531 532 protected String[] getIncludeSourceFields() { 533 return includeSourceFields; 534 } 535 536 protected String[] getExcludeSourceFields() { 537 return excludeSourceFields; 538 } 539 540 protected Map<String, String> getRepositoryMap() { 541 return repoNames; 542 } 543 544 /** 545 * Get the list of repository names that have their index created. 546 */ 547 public List<String> getInitializedRepositories() { 548 return repositoryInitialized; 549 } 550}