001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS; 024 025import java.net.InetAddress; 026import java.net.UnknownHostException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.LinkedHashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.NoSuchElementException; 036import java.util.Set; 037import java.util.concurrent.atomic.AtomicInteger; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 042import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; 043import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 044import org.elasticsearch.client.Client; 045import org.elasticsearch.client.transport.NoNodeAvailableException; 046import org.elasticsearch.client.transport.TransportClient; 047import org.elasticsearch.common.settings.ImmutableSettings; 048import org.elasticsearch.common.settings.ImmutableSettings.Builder; 049import org.elasticsearch.common.settings.Settings; 050import org.elasticsearch.common.transport.InetSocketTransportAddress; 051import org.elasticsearch.node.Node; 052import org.elasticsearch.node.NodeBuilder; 053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 054import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 055import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig; 056import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig; 057import org.nuxeo.runtime.api.Framework; 058 059import com.google.common.util.concurrent.ListenableFuture; 060 061/** 062 * @since 6.0 063 */ 064public class ElasticSearchAdminImpl implements ElasticSearchAdmin { 065 private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class); 066 067 private static final String TIMEOUT_WAIT_FOR_CLUSTER = "30s"; 068 069 final AtomicInteger totalCommandProcessed = new AtomicInteger(0); 070 071 private final Map<String, String> indexNames = new HashMap<>(); 072 073 private final Map<String, String> repoNames = new HashMap<>(); 074 075 private final Map<String, ElasticSearchIndexConfig> indexConfig; 076 077 private Node localNode; 078 079 private Client client; 080 081 private boolean indexInitDone = false; 082 083 private final ElasticSearchLocalConfig localConfig; 084 085 private final ElasticSearchRemoteConfig remoteConfig; 086 087 private String[] includeSourceFields; 088 089 private String[] excludeSourceFields; 090 091 private boolean embedded = true; 092 093 private List<String> repositoryInitialized = new ArrayList<>(); 094 095 /** 096 * Init the admin service, remote configuration if not null will take precedence over local embedded configuration. 097 */ 098 public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig, 099 Map<String, ElasticSearchIndexConfig> indexConfig) { 100 this.remoteConfig = remoteConfig; 101 this.localConfig = localConfig; 102 this.indexConfig = indexConfig; 103 connect(); 104 initializeIndexes(); 105 } 106 107 private void connect() { 108 if (client != null) { 109 return; 110 } 111 if (remoteConfig != null) { 112 client = connectToRemote(remoteConfig); 113 embedded = false; 114 } else { 115 localNode = createEmbeddedNode(localConfig); 116 client = connectToEmbedded(); 117 embedded = true; 118 } 119 checkClusterHealth(); 120 log.info("ES Connected"); 121 } 122 123 public void disconnect() { 124 if (client != null) { 125 client.close(); 126 client = null; 127 indexInitDone = false; 128 log.info("ES Disconnected"); 129 } 130 if (localNode != null) { 131 localNode.close(); 132 localNode = null; 133 log.info("ES embedded Node Stopped"); 134 } 135 } 136 137 private Node createEmbeddedNode(ElasticSearchLocalConfig conf) { 138 log.info("ES embedded Node Initializing (local in JVM)"); 139 if (conf == null) { 140 throw new IllegalStateException("No embedded configuration defined"); 141 } 142 if (!Framework.isTestModeSet()) { 143 log.warn("Elasticsearch embedded configuration is ONLY for testing" 144 + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production."); 145 } 146 Builder sBuilder = ImmutableSettings.settingsBuilder(); 147 sBuilder.put("http.enabled", conf.httpEnabled()).put("network.host", conf.getNetworkHost()).put("path.data", 148 conf.getDataPath()).put("index.number_of_shards", 1).put("index.number_of_replicas", 0).put( 149 "cluster.name", conf.getClusterName()).put("node.name", conf.getNodeName()).put( 150 "http.netty.worker_count", 4).put("http.cors.enabled", true).put( 151 "cluster.routing.allocation.disk.threshold_enabled", false); 152 if (conf.getIndexStorageType() != null) { 153 sBuilder.put("index.store.type", conf.getIndexStorageType()); 154 if (conf.getIndexStorageType().equals("memory")) { 155 sBuilder.put("gateway.type", "none"); 156 } 157 } 158 Settings settings = sBuilder.build(); 159 log.debug("Using settings: " + settings.toDelimitedString(',')); 160 Node ret = NodeBuilder.nodeBuilder().local(true).settings(settings).node(); 161 assert ret != null : "Can not create an embedded ES Node"; 162 return ret; 163 } 164 165 private Client connectToEmbedded() { 166 log.info("Connecting to embedded ES"); 167 Client ret = localNode.start().client(); 168 assert ret != null : "Can not connect to embedded ES Node"; 169 return ret; 170 } 171 172 private Client connectToRemote(ElasticSearchRemoteConfig config) { 173 log.info("Connecting to remote ES cluster: " + config); 174 Builder builder = ImmutableSettings.settingsBuilder().put("cluster.name", config.getClusterName()).put( 175 "client.transport.nodes_sampler_interval", config.getSamplerInterval()).put( 176 "client.transport.ping_timeout", config.getPingTimeout()).put("client.transport.ignore_cluster_name", 177 config.isIgnoreClusterName()).put("client.transport.sniff", config.isClusterSniff()); 178 Settings settings = builder.build(); 179 if (log.isDebugEnabled()) { 180 log.debug("Using settings: " + settings.toDelimitedString(',')); 181 } 182 TransportClient ret = new TransportClient(settings); 183 String[] addresses = config.getAddresses(); 184 if (addresses == null) { 185 log.error("You need to provide an addressList to join a cluster"); 186 } else { 187 for (String item : config.getAddresses()) { 188 String[] address = item.split(":"); 189 log.debug("Add transport address: " + item); 190 try { 191 InetAddress inet = InetAddress.getByName(address[0]); 192 ret.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1]))); 193 } catch (UnknownHostException e) { 194 log.error("Unable to resolve host " + address[0], e); 195 } 196 } 197 } 198 assert ret != null : "Unable to create a remote client"; 199 return ret; 200 } 201 202 private void checkClusterHealth(String... indexNames) { 203 if (client == null) { 204 throw new IllegalStateException("No es client available"); 205 } 206 String errorMessage = null; 207 try { 208 log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames)); 209 ClusterHealthResponse ret = client.admin().cluster().prepareHealth(indexNames).setTimeout( 210 TIMEOUT_WAIT_FOR_CLUSTER).setWaitForYellowStatus().get(); 211 if (ret.isTimedOut()) { 212 errorMessage = "ES Cluster health status not Yellow after " + TIMEOUT_WAIT_FOR_CLUSTER + " give up: " 213 + ret; 214 } else { 215 if ((indexNames.length > 0) && ret.getStatus() != ClusterHealthStatus.GREEN) { 216 log.warn("Es Cluster ready but not GREEN: " + ret); 217 } else { 218 log.info("ES Cluster ready: " + ret); 219 } 220 } 221 } catch (NoNodeAvailableException e) { 222 errorMessage = "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage(); 223 } 224 if (errorMessage != null) { 225 log.error(errorMessage); 226 throw new RuntimeException(errorMessage); 227 } 228 } 229 230 private void initializeIndexes() { 231 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 232 if (conf.isDocumentIndex()) { 233 log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName()); 234 indexNames.put(conf.getRepositoryName(), conf.getName()); 235 repoNames.put(conf.getName(), conf.getRepositoryName()); 236 Set<String> set = new LinkedHashSet<>(); 237 if (includeSourceFields != null) { 238 set.addAll(Arrays.asList(includeSourceFields)); 239 } 240 set.addAll(Arrays.asList(conf.getIncludes())); 241 if (set.contains(ALL_FIELDS)) { 242 set.clear(); 243 set.add(ALL_FIELDS); 244 } 245 includeSourceFields = set.toArray(new String[set.size()]); 246 set.clear(); 247 if (excludeSourceFields != null) { 248 set.addAll(Arrays.asList(excludeSourceFields)); 249 } 250 set.addAll(Arrays.asList(conf.getExcludes())); 251 excludeSourceFields = set.toArray(new String[set.size()]); 252 } 253 254 } 255 initIndexes(false); 256 } 257 258 // Admin Impl ============================================================= 259 @Override 260 public void refreshRepositoryIndex(String repositoryName) { 261 if (log.isDebugEnabled()) { 262 log.debug("Refreshing index associated with repo: " + repositoryName); 263 } 264 getClient().admin().indices().prepareRefresh(getIndexNameForRepository(repositoryName)).execute().actionGet(); 265 if (log.isDebugEnabled()) { 266 log.debug("Refreshing index done"); 267 } 268 } 269 270 @Override 271 public String getIndexNameForRepository(String repositoryName) { 272 String ret = indexNames.get(repositoryName); 273 if (ret == null) { 274 throw new NoSuchElementException("No index defined for repository: " + repositoryName); 275 } 276 return ret; 277 } 278 279 @Override 280 public List<String> getIndexNamesForType(String type) { 281 List<String> indexNames = new ArrayList<>(); 282 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 283 if (type.equals(conf.getType())) { 284 indexNames.add(conf.getName()); 285 } 286 } 287 return indexNames; 288 } 289 290 @Override 291 public String getIndexNameForType(String type) { 292 List<String> indexNames = getIndexNamesForType(type); 293 if (indexNames.isEmpty()) { 294 throw new NoSuchElementException("No index defined for type: " + type); 295 } 296 return indexNames.get(0); 297 } 298 299 @Override 300 public void flushRepositoryIndex(String repositoryName) { 301 log.warn("Flushing index associated with repo: " + repositoryName); 302 getClient().admin().indices().prepareFlush(getIndexNameForRepository(repositoryName)).execute().actionGet(); 303 log.info("Flushing index done"); 304 } 305 306 @Override 307 public void refresh() { 308 for (String repositoryName : indexNames.keySet()) { 309 refreshRepositoryIndex(repositoryName); 310 } 311 } 312 313 @Override 314 public void flush() { 315 for (String repositoryName : indexNames.keySet()) { 316 flushRepositoryIndex(repositoryName); 317 } 318 } 319 320 @Override 321 public void optimizeIndex(String indexName) { 322 log.warn("Optimizing index: " + indexName); 323 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 324 if (conf.getName().equals(indexName)) { 325 getClient().admin().indices().prepareOptimize(indexName).get(); 326 } 327 } 328 log.info("Optimize done"); 329 } 330 331 @Override 332 public void optimizeRepositoryIndex(String repositoryName) { 333 optimizeIndex(getIndexNameForRepository(repositoryName)); 334 } 335 336 @Override 337 public void optimize() { 338 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 339 optimizeIndex(conf.getName()); 340 } 341 } 342 343 @Override 344 public Client getClient() { 345 return client; 346 } 347 348 @Override 349 public void initIndexes(boolean dropIfExists) { 350 indexInitDone = false; 351 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 352 initIndex(conf, dropIfExists); 353 } 354 log.info("ES Service ready"); 355 indexInitDone = true; 356 } 357 358 @Override 359 public void dropAndInitIndex(String indexName) { 360 log.info("Drop and init index: " + indexName); 361 indexInitDone = false; 362 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 363 if (conf.getName().equals(indexName)) { 364 initIndex(conf, true); 365 } 366 } 367 indexInitDone = true; 368 } 369 370 @Override 371 public void dropAndInitRepositoryIndex(String repositoryName) { 372 log.info("Drop and init index of repository: " + repositoryName); 373 indexInitDone = false; 374 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 375 if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) { 376 initIndex(conf, true); 377 } 378 } 379 indexInitDone = true; 380 } 381 382 @Override 383 public List<String> getRepositoryNames() { 384 return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet())); 385 } 386 387 void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) { 388 if (!conf.mustCreate()) { 389 return; 390 } 391 log.info(String.format("Initialize index: %s, type: %s", conf.getName(), conf.getType())); 392 boolean mappingExists = false; 393 boolean indexExists = getClient().admin().indices().prepareExists(conf.getName()).execute().actionGet().isExists(); 394 if (indexExists) { 395 if (!dropIfExists) { 396 log.debug("Index " + conf.getName() + " already exists"); 397 mappingExists = getClient().admin().indices().prepareGetMappings(conf.getName()).execute().actionGet().getMappings().get( 398 conf.getName()).containsKey(conf.getType()); 399 } else { 400 if (!Framework.isTestModeSet()) { 401 log.warn(String.format("Initializing index: %s, type: %s with " 402 + "dropIfExists flag, deleting an existing index", conf.getName(), conf.getType())); 403 } 404 getClient().admin().indices().delete(new DeleteIndexRequest(conf.getName())).actionGet(); 405 indexExists = false; 406 } 407 } 408 if (!indexExists) { 409 log.info(String.format("Creating index: %s", conf.getName())); 410 if (log.isDebugEnabled()) { 411 log.debug("Using settings: " + conf.getSettings()); 412 } 413 getClient().admin().indices().prepareCreate(conf.getName()).setSettings(conf.getSettings()).execute().actionGet(); 414 } 415 if (!mappingExists) { 416 log.info(String.format("Creating mapping type: %s on index: %s", conf.getType(), conf.getName())); 417 if (log.isDebugEnabled()) { 418 log.debug("Using mapping: " + conf.getMapping()); 419 } 420 getClient().admin().indices().preparePutMapping(conf.getName()).setType(conf.getType()).setSource( 421 conf.getMapping()).execute().actionGet(); 422 if (!dropIfExists && conf.getRepositoryName() != null) { 423 repositoryInitialized.add(conf.getRepositoryName()); 424 } 425 } 426 // make sure the index is ready before returning 427 checkClusterHealth(conf.getName()); 428 } 429 430 @Override 431 public int getPendingWorkerCount() { 432 // impl of scheduling is left to the ESService 433 throw new UnsupportedOperationException("Not implemented"); 434 } 435 436 @Override 437 public int getRunningWorkerCount() { 438 // impl of scheduling is left to the ESService 439 throw new UnsupportedOperationException("Not implemented"); 440 } 441 442 @Override 443 public int getTotalCommandProcessed() { 444 return totalCommandProcessed.get(); 445 } 446 447 @Override 448 public boolean isEmbedded() { 449 return embedded; 450 } 451 452 @Override 453 public boolean isIndexingInProgress() { 454 // impl of scheduling is left to the ESService 455 throw new UnsupportedOperationException("Not implemented"); 456 } 457 458 @Override 459 public ListenableFuture<Boolean> prepareWaitForIndexing() { 460 throw new UnsupportedOperationException("Not implemented"); 461 } 462 463 /** 464 * Get the elastic search indexes for searches 465 */ 466 String[] getSearchIndexes(List<String> searchRepositories) { 467 if (searchRepositories.isEmpty()) { 468 Collection<String> values = indexNames.values(); 469 return values.toArray(new String[values.size()]); 470 } 471 String[] ret = new String[searchRepositories.size()]; 472 int i = 0; 473 for (String repo : searchRepositories) { 474 ret[i++] = getIndexNameForRepository(repo); 475 } 476 return ret; 477 } 478 479 public boolean isReady() { 480 return indexInitDone; 481 } 482 483 String[] getIncludeSourceFields() { 484 return includeSourceFields; 485 } 486 487 String[] getExcludeSourceFields() { 488 return excludeSourceFields; 489 } 490 491 Map<String, String> getRepositoryMap() { 492 return repoNames; 493 } 494 495 /** 496 * Get the list of repository names that have their index created. 497 */ 498 public List<String> getInitializedRepositories() { 499 return repositoryInitialized; 500 } 501}