001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Tiry 016 * bdelbosc 017 */ 018 019package org.nuxeo.elasticsearch.core; 020 021import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS; 022 023import java.net.InetAddress; 024import java.net.UnknownHostException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.LinkedHashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.NoSuchElementException; 034import java.util.Set; 035import java.util.concurrent.atomic.AtomicInteger; 036 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 040import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; 041import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 042import org.elasticsearch.client.Client; 043import org.elasticsearch.client.transport.NoNodeAvailableException; 044import org.elasticsearch.client.transport.TransportClient; 045import org.elasticsearch.common.settings.ImmutableSettings; 046import org.elasticsearch.common.settings.ImmutableSettings.Builder; 047import org.elasticsearch.common.settings.Settings; 048import org.elasticsearch.common.transport.InetSocketTransportAddress; 049import org.elasticsearch.node.Node; 050import org.elasticsearch.node.NodeBuilder; 051import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 052import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 053import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig; 054import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig; 055import org.nuxeo.runtime.api.Framework; 056 057import com.google.common.util.concurrent.ListenableFuture; 058 059/** 060 * @since 6.0 061 */ 062public class ElasticSearchAdminImpl implements ElasticSearchAdmin { 063 private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class); 064 065 private static final String TIMEOUT_WAIT_FOR_CLUSTER = "30s"; 066 067 final AtomicInteger totalCommandProcessed = new AtomicInteger(0); 068 069 private final Map<String, String> indexNames = new HashMap<>(); 070 071 private final Map<String, String> repoNames = new HashMap<>(); 072 073 private final Map<String, ElasticSearchIndexConfig> indexConfig; 074 075 private Node localNode; 076 077 private Client client; 078 079 private boolean indexInitDone = false; 080 081 private final ElasticSearchLocalConfig localConfig; 082 083 private final ElasticSearchRemoteConfig remoteConfig; 084 085 private String[] includeSourceFields; 086 087 private String[] excludeSourceFields; 088 089 private boolean embedded = true; 090 091 private List<String> repositoryInitialized = new ArrayList<>(); 092 093 /** 094 * Init the admin service, remote configuration if not null will take precedence over local embedded configuration. 095 */ 096 public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig, 097 Map<String, ElasticSearchIndexConfig> indexConfig) { 098 this.remoteConfig = remoteConfig; 099 this.localConfig = localConfig; 100 this.indexConfig = indexConfig; 101 connect(); 102 initializeIndexes(); 103 } 104 105 private void connect() { 106 if (client != null) { 107 return; 108 } 109 if (remoteConfig != null) { 110 client = connectToRemote(remoteConfig); 111 embedded = false; 112 } else { 113 localNode = createEmbeddedNode(localConfig); 114 client = connectToEmbedded(); 115 embedded = true; 116 } 117 checkClusterHealth(); 118 log.info("ES Connected"); 119 } 120 121 public void disconnect() { 122 if (client != null) { 123 client.close(); 124 client = null; 125 indexInitDone = false; 126 log.info("ES Disconnected"); 127 } 128 if (localNode != null) { 129 localNode.close(); 130 localNode = null; 131 log.info("ES embedded Node Stopped"); 132 } 133 } 134 135 private Node createEmbeddedNode(ElasticSearchLocalConfig conf) { 136 log.info("ES embedded Node Initializing (local in JVM)"); 137 if (conf == null) { 138 throw new IllegalStateException("No embedded configuration defined"); 139 } 140 if (!Framework.isTestModeSet()) { 141 log.warn("Elasticsearch embedded configuration is ONLY for testing" 142 + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production."); 143 } 144 Builder sBuilder = ImmutableSettings.settingsBuilder(); 145 sBuilder.put("http.enabled", conf.httpEnabled()).put("network.host", conf.getNetworkHost()).put("path.data", 146 conf.getDataPath()).put("index.number_of_shards", 1).put("index.number_of_replicas", 0).put( 147 "cluster.name", conf.getClusterName()).put("node.name", conf.getNodeName()).put( 148 "http.netty.worker_count", 4).put("http.cors.enabled", true).put( 149 "cluster.routing.allocation.disk.threshold_enabled", false); 150 if (conf.getIndexStorageType() != null) { 151 sBuilder.put("index.store.type", conf.getIndexStorageType()); 152 if (conf.getIndexStorageType().equals("memory")) { 153 sBuilder.put("gateway.type", "none"); 154 } 155 } 156 Settings settings = sBuilder.build(); 157 log.debug("Using settings: " + settings.toDelimitedString(',')); 158 Node ret = NodeBuilder.nodeBuilder().local(true).settings(settings).node(); 159 assert ret != null : "Can not create an embedded ES Node"; 160 return ret; 161 } 162 163 private Client connectToEmbedded() { 164 log.info("Connecting to embedded ES"); 165 Client ret = localNode.start().client(); 166 assert ret != null : "Can not connect to embedded ES Node"; 167 return ret; 168 } 169 170 private Client connectToRemote(ElasticSearchRemoteConfig config) { 171 log.info("Connecting to remote ES cluster: " + config); 172 Builder builder = ImmutableSettings.settingsBuilder().put("cluster.name", config.getClusterName()).put( 173 "client.transport.nodes_sampler_interval", config.getSamplerInterval()).put( 174 "client.transport.ping_timeout", config.getPingTimeout()).put("client.transport.ignore_cluster_name", 175 config.isIgnoreClusterName()).put("client.transport.sniff", config.isClusterSniff()); 176 Settings settings = builder.build(); 177 if (log.isDebugEnabled()) { 178 log.debug("Using settings: " + settings.toDelimitedString(',')); 179 } 180 TransportClient ret = new TransportClient(settings); 181 String[] addresses = config.getAddresses(); 182 if (addresses == null) { 183 log.error("You need to provide an addressList to join a cluster"); 184 } else { 185 for (String item : config.getAddresses()) { 186 String[] address = item.split(":"); 187 log.debug("Add transport address: " + item); 188 try { 189 InetAddress inet = InetAddress.getByName(address[0]); 190 ret.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1]))); 191 } catch (UnknownHostException e) { 192 log.error("Unable to resolve host " + address[0], e); 193 } 194 } 195 } 196 assert ret != null : "Unable to create a remote client"; 197 return ret; 198 } 199 200 private void checkClusterHealth(String... indexNames) { 201 if (client == null) { 202 throw new IllegalStateException("No es client available"); 203 } 204 String errorMessage = null; 205 try { 206 log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames)); 207 ClusterHealthResponse ret = client.admin().cluster().prepareHealth(indexNames).setTimeout( 208 TIMEOUT_WAIT_FOR_CLUSTER).setWaitForYellowStatus().get(); 209 if (ret.isTimedOut()) { 210 errorMessage = "ES Cluster health status not Yellow after " + TIMEOUT_WAIT_FOR_CLUSTER + " give up: " 211 + ret; 212 } else { 213 if ((indexNames.length > 0) && ret.getStatus() != ClusterHealthStatus.GREEN) { 214 log.warn("Es Cluster ready but not GREEN: " + ret); 215 } else { 216 log.info("ES Cluster ready: " + ret); 217 } 218 } 219 } catch (NoNodeAvailableException e) { 220 errorMessage = "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage(); 221 } 222 if (errorMessage != null) { 223 log.error(errorMessage); 224 throw new RuntimeException(errorMessage); 225 } 226 } 227 228 private void initializeIndexes() { 229 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 230 if (conf.isDocumentIndex()) { 231 log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName()); 232 indexNames.put(conf.getRepositoryName(), conf.getName()); 233 repoNames.put(conf.getName(), conf.getRepositoryName()); 234 Set<String> set = new LinkedHashSet<>(); 235 if (includeSourceFields != null) { 236 set.addAll(Arrays.asList(includeSourceFields)); 237 } 238 set.addAll(Arrays.asList(conf.getIncludes())); 239 if (set.contains(ALL_FIELDS)) { 240 set.clear(); 241 set.add(ALL_FIELDS); 242 } 243 includeSourceFields = set.toArray(new String[set.size()]); 244 set.clear(); 245 if (excludeSourceFields != null) { 246 set.addAll(Arrays.asList(excludeSourceFields)); 247 } 248 set.addAll(Arrays.asList(conf.getExcludes())); 249 excludeSourceFields = set.toArray(new String[set.size()]); 250 } 251 252 } 253 initIndexes(false); 254 } 255 256 // Admin Impl ============================================================= 257 @Override 258 public void refreshRepositoryIndex(String repositoryName) { 259 if (log.isDebugEnabled()) { 260 log.debug("Refreshing index associated with repo: " + repositoryName); 261 } 262 getClient().admin().indices().prepareRefresh(getIndexNameForRepository(repositoryName)).execute().actionGet(); 263 if (log.isDebugEnabled()) { 264 log.debug("Refreshing index done"); 265 } 266 } 267 268 @Override 269 public String getIndexNameForRepository(String repositoryName) { 270 String ret = indexNames.get(repositoryName); 271 if (ret == null) { 272 throw new NoSuchElementException("No index defined for repository: " + repositoryName); 273 } 274 return ret; 275 } 276 277 @Override 278 public List<String> getIndexNamesForType(String type) { 279 List<String> indexNames = new ArrayList<>(); 280 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 281 if (type.equals(conf.getType())) { 282 indexNames.add(conf.getName()); 283 } 284 } 285 return indexNames; 286 } 287 288 @Override 289 public String getIndexNameForType(String type) { 290 List<String> indexNames = getIndexNamesForType(type); 291 if (indexNames.isEmpty()) { 292 throw new NoSuchElementException("No index defined for type: " + type); 293 } 294 return indexNames.get(0); 295 } 296 297 @Override 298 public void flushRepositoryIndex(String repositoryName) { 299 log.warn("Flushing index associated with repo: " + repositoryName); 300 getClient().admin().indices().prepareFlush(getIndexNameForRepository(repositoryName)).execute().actionGet(); 301 log.info("Flushing index done"); 302 } 303 304 @Override 305 public void refresh() { 306 for (String repositoryName : indexNames.keySet()) { 307 refreshRepositoryIndex(repositoryName); 308 } 309 } 310 311 @Override 312 public void flush() { 313 for (String repositoryName : indexNames.keySet()) { 314 flushRepositoryIndex(repositoryName); 315 } 316 } 317 318 @Override 319 public void optimizeIndex(String indexName) { 320 log.warn("Optimizing index: " + indexName); 321 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 322 if (conf.getName().equals(indexName)) { 323 getClient().admin().indices().prepareOptimize(indexName).get(); 324 } 325 } 326 log.info("Optimize done"); 327 } 328 329 @Override 330 public void optimizeRepositoryIndex(String repositoryName) { 331 optimizeIndex(getIndexNameForRepository(repositoryName)); 332 } 333 334 @Override 335 public void optimize() { 336 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 337 optimizeIndex(conf.getName()); 338 } 339 } 340 341 @Override 342 public Client getClient() { 343 return client; 344 } 345 346 @Override 347 public void initIndexes(boolean dropIfExists) { 348 indexInitDone = false; 349 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 350 initIndex(conf, dropIfExists); 351 } 352 log.info("ES Service ready"); 353 indexInitDone = true; 354 } 355 356 @Override 357 public void dropAndInitIndex(String indexName) { 358 log.info("Drop and init index: " + indexName); 359 indexInitDone = false; 360 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 361 if (conf.getName().equals(indexName)) { 362 initIndex(conf, true); 363 } 364 } 365 indexInitDone = true; 366 } 367 368 @Override 369 public void dropAndInitRepositoryIndex(String repositoryName) { 370 log.info("Drop and init index of repository: " + repositoryName); 371 indexInitDone = false; 372 for (ElasticSearchIndexConfig conf : indexConfig.values()) { 373 if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) { 374 initIndex(conf, true); 375 } 376 } 377 indexInitDone = true; 378 } 379 380 @Override 381 public List<String> getRepositoryNames() { 382 return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet())); 383 } 384 385 void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) { 386 if (!conf.mustCreate()) { 387 return; 388 } 389 log.info(String.format("Initialize index: %s, type: %s", conf.getName(), conf.getType())); 390 boolean mappingExists = false; 391 boolean indexExists = getClient().admin().indices().prepareExists(conf.getName()).execute().actionGet().isExists(); 392 if (indexExists) { 393 if (!dropIfExists) { 394 log.debug("Index " + conf.getName() + " already exists"); 395 mappingExists = getClient().admin().indices().prepareGetMappings(conf.getName()).execute().actionGet().getMappings().get( 396 conf.getName()).containsKey(conf.getType()); 397 } else { 398 if (!Framework.isTestModeSet()) { 399 log.warn(String.format("Initializing index: %s, type: %s with " 400 + "dropIfExists flag, deleting an existing index", conf.getName(), conf.getType())); 401 } 402 getClient().admin().indices().delete(new DeleteIndexRequest(conf.getName())).actionGet(); 403 indexExists = false; 404 } 405 } 406 if (!indexExists) { 407 log.info(String.format("Creating index: %s", conf.getName())); 408 if (log.isDebugEnabled()) { 409 log.debug("Using settings: " + conf.getSettings()); 410 } 411 getClient().admin().indices().prepareCreate(conf.getName()).setSettings(conf.getSettings()).execute().actionGet(); 412 } 413 if (!mappingExists) { 414 log.info(String.format("Creating mapping type: %s on index: %s", conf.getType(), conf.getName())); 415 if (log.isDebugEnabled()) { 416 log.debug("Using mapping: " + conf.getMapping()); 417 } 418 getClient().admin().indices().preparePutMapping(conf.getName()).setType(conf.getType()).setSource( 419 conf.getMapping()).execute().actionGet(); 420 if (!dropIfExists && conf.getRepositoryName() != null) { 421 repositoryInitialized.add(conf.getRepositoryName()); 422 } 423 } 424 // make sure the index is ready before returning 425 checkClusterHealth(conf.getName()); 426 } 427 428 @Override 429 public int getPendingWorkerCount() { 430 // impl of scheduling is left to the ESService 431 throw new UnsupportedOperationException("Not implemented"); 432 } 433 434 @Override 435 public int getRunningWorkerCount() { 436 // impl of scheduling is left to the ESService 437 throw new UnsupportedOperationException("Not implemented"); 438 } 439 440 @Override 441 public int getTotalCommandProcessed() { 442 return totalCommandProcessed.get(); 443 } 444 445 @Override 446 public boolean isEmbedded() { 447 return embedded; 448 } 449 450 @Override 451 public boolean isIndexingInProgress() { 452 // impl of scheduling is left to the ESService 453 throw new UnsupportedOperationException("Not implemented"); 454 } 455 456 @Override 457 public ListenableFuture<Boolean> prepareWaitForIndexing() { 458 throw new UnsupportedOperationException("Not implemented"); 459 } 460 461 /** 462 * Get the elastic search indexes for searches 463 */ 464 String[] getSearchIndexes(List<String> searchRepositories) { 465 if (searchRepositories.isEmpty()) { 466 Collection<String> values = indexNames.values(); 467 return values.toArray(new String[values.size()]); 468 } 469 String[] ret = new String[searchRepositories.size()]; 470 int i = 0; 471 for (String repo : searchRepositories) { 472 ret[i++] = getIndexNameForRepository(repo); 473 } 474 return ret; 475 } 476 477 public boolean isReady() { 478 return indexInitDone; 479 } 480 481 String[] getIncludeSourceFields() { 482 return includeSourceFields; 483 } 484 485 String[] getExcludeSourceFields() { 486 return excludeSourceFields; 487 } 488 489 Map<String, String> getRepositoryMap() { 490 return repoNames; 491 } 492 493 /** 494 * Get the list of repository names that have their index created. 495 */ 496 public List<String> getInitializedRepositories() { 497 return repositoryInitialized; 498 } 499}