001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020package org.nuxeo.elasticsearch; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY; 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Optional; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.TimeoutException; 039import java.util.concurrent.atomic.AtomicInteger; 040 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.elasticsearch.common.bytes.BytesReference; 044import org.elasticsearch.index.query.QueryBuilder; 045import org.nuxeo.ecm.core.api.CoreSession; 046import org.nuxeo.ecm.core.api.DocumentModel; 047import org.nuxeo.ecm.core.api.DocumentModelList; 048import org.nuxeo.ecm.core.api.NuxeoException; 049import org.nuxeo.ecm.core.api.SortInfo; 050import org.nuxeo.ecm.core.repository.RepositoryService; 051import org.nuxeo.ecm.core.work.api.Work; 052import org.nuxeo.ecm.core.work.api.WorkManager; 053import org.nuxeo.elasticsearch.api.ESClient; 054import org.nuxeo.elasticsearch.api.ESHintQueryBuilder; 055import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 056import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 057import org.nuxeo.elasticsearch.api.ElasticSearchService; 058import org.nuxeo.elasticsearch.api.EsResult; 059import org.nuxeo.elasticsearch.api.EsScrollResult; 060import org.nuxeo.elasticsearch.commands.IndexingCommand; 061import org.nuxeo.elasticsearch.config.ESHintQueryBuilderDescriptor; 062import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig; 063import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor; 064import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig; 065import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 066import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl; 067import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl; 068import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl; 069import org.nuxeo.elasticsearch.io.JsonESDocumentWriter; 070import org.nuxeo.elasticsearch.query.NxQueryBuilder; 071import org.nuxeo.elasticsearch.work.IndexingWorker; 072import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker; 073import org.nuxeo.runtime.api.Framework; 074import org.nuxeo.runtime.model.ComponentContext; 075import org.nuxeo.runtime.model.ComponentInstance; 076import org.nuxeo.runtime.model.DefaultComponent; 077import org.nuxeo.runtime.transaction.TransactionHelper; 078 079import com.google.common.util.concurrent.ListenableFuture; 080import com.google.common.util.concurrent.ListeningExecutorService; 081import com.google.common.util.concurrent.MoreExecutors; 082 083/** 084 * Component used to configure and manage ElasticSearch integration 085 */ 086public class ElasticSearchComponent extends DefaultComponent 087 implements ElasticSearchAdmin, ElasticSearchIndexing, ElasticSearchService { 088 089 protected static final Log log = LogFactory.getLog(ElasticSearchComponent.class); 090 091 protected static final String EP_EMBEDDED_SERVER = "elasticSearchEmbeddedServer"; 092 093 protected static final String EP_CLIENT_INIT = "elasticSearchClient"; 094 095 protected static final String EP_INDEX = "elasticSearchIndex"; 096 097 protected static final String EP_DOC_WRITER = "elasticSearchDocWriter"; 098 099 /** 100 * @since 11.1 101 */ 102 protected static final String EP_HINTS = "elasticSearchHints"; 103 104 protected static final long REINDEX_TIMEOUT = 20; 105 106 // Indexing commands that where received before the index initialization 107 protected final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>()); 108 109 protected final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>(); 110 111 protected final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0); 112 113 protected ElasticSearchEmbeddedServerConfig embeddedServerConfig; 114 115 protected ElasticSearchClientConfig clientConfig; 116 117 protected ElasticSearchAdminImpl esa; 118 119 protected ElasticSearchIndexingImpl esi; 120 121 protected ElasticSearchServiceImpl ess; 122 123 protected JsonESDocumentWriter jsonESDocumentWriter; 124 125 protected ListeningExecutorService waiterExecutorService; 126 127 // Nuxeo Component impl ======================================é============= 128 @Override 129 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 130 switch (extensionPoint) { 131 case EP_EMBEDDED_SERVER: 132 ElasticSearchEmbeddedServerConfig serverContrib = (ElasticSearchEmbeddedServerConfig) contribution; 133 if (serverContrib.isEnabled()) { 134 embeddedServerConfig = serverContrib; 135 log.info("Registering embedded server configuration: " + embeddedServerConfig + ", loaded from " 136 + contributor.getName()); 137 } else if (embeddedServerConfig != null) { 138 log.info("Disabling previous embedded server configuration, deactivated by " + contributor.getName()); 139 embeddedServerConfig = null; 140 } 141 break; 142 case EP_CLIENT_INIT: 143 clientConfig = (ElasticSearchClientConfig) contribution; 144 break; 145 case EP_INDEX: 146 ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution; 147 ElasticSearchIndexConfig previous = indexConfig.get(idx.getName()); 148 if (idx.isEnabled()) { 149 idx.merge(previous); 150 indexConfig.put(idx.getName(), idx); 151 log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName()); 152 } else if (previous != null) { 153 log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName()); 154 indexConfig.remove(idx.getName()); 155 } 156 break; 157 case EP_DOC_WRITER: 158 ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution; 159 try { 160 jsonESDocumentWriter = writerDescriptor.getKlass().getDeclaredConstructor().newInstance(); 161 } catch (ReflectiveOperationException e) { 162 log.error("Cannot instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass()); 163 throw new NuxeoException(e); 164 } 165 break; 166 case EP_HINTS: 167 ESHintQueryBuilderDescriptor esHintDescriptor = (ESHintQueryBuilderDescriptor) contribution; 168 register(EP_HINTS, esHintDescriptor); 169 break; 170 default: 171 throw new IllegalStateException("Invalid EP: " + extensionPoint); 172 } 173 } 174 175 @Override 176 public void start(ComponentContext context) { 177 if (!isElasticsearchEnabled()) { 178 log.info("Elasticsearch service is disabled"); 179 return; 180 } 181 esa = new ElasticSearchAdminImpl(embeddedServerConfig, clientConfig, indexConfig, getDescriptors(EP_HINTS)); 182 esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter); 183 ess = new ElasticSearchServiceImpl(esa); 184 initListenerThreadPool(); 185 processStackedCommands(); 186 reindexOnStartup(); 187 } 188 189 @Override 190 public void stop(ComponentContext context) { 191 if (esa == null) { 192 // Elasticsearch service was disabled 193 return; 194 } 195 try { 196 shutdownListenerThreadPool(); 197 } finally { 198 try { 199 esa.disconnect(); 200 } finally { 201 esa = null; 202 esi = null; 203 ess = null; 204 } 205 } 206 } 207 208 protected void reindexOnStartup() { 209 boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false")); 210 if (!reindexOnStartup) { 211 return; 212 } 213 for (String repositoryName : esa.getInitializedRepositories()) { 214 log.warn(String.format("Indexing repository: %s on startup", repositoryName)); 215 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 216 try { 217 prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS); 218 } catch (InterruptedException e) { 219 Thread.currentThread().interrupt(); 220 } catch (ExecutionException e) { 221 log.error(e.getMessage(), e); 222 } catch (TimeoutException e) { 223 log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background", 224 repositoryName, REINDEX_TIMEOUT)); 225 } 226 } 227 } 228 229 protected boolean isElasticsearchEnabled() { 230 return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true")); 231 } 232 233 @Override 234 public int getApplicationStartedOrder() { 235 RepositoryService component = (RepositoryService) Framework.getRuntime() 236 .getComponent( 237 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 238 return component.getApplicationStartedOrder() / 2; 239 } 240 241 void processStackedCommands() { 242 if (!stackedCommands.isEmpty()) { 243 log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size())); 244 runIndexingWorker(stackedCommands); 245 stackedCommands.clear(); 246 log.debug("Done"); 247 } 248 } 249 250 // Es Admin ================================================================ 251 252 @Override 253 public ESClient getClient() { 254 return esa.getClient(); 255 } 256 257 @Override 258 public void initIndexes(boolean dropIfExists) { 259 esa.initIndexes(dropIfExists); 260 } 261 262 @Override 263 public void dropAndInitIndex(String indexName) { 264 esa.dropAndInitIndex(indexName); 265 } 266 267 @Override 268 public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) { 269 esa.dropAndInitRepositoryIndex(repositoryName, syncAlias); 270 } 271 272 @Override 273 public List<String> getRepositoryNames() { 274 return esa.getRepositoryNames(); 275 } 276 277 @Override 278 public String getIndexNameForRepository(String repositoryName) { 279 return esa.getIndexNameForRepository(repositoryName); 280 } 281 282 @Override 283 public String getRepositoryForIndex(String indexName) { 284 return esa.getRepositoryForIndex(indexName); 285 } 286 287 @Override 288 public List<String> getIndexNamesForType(String type) { 289 return esa.getIndexNamesForType(type); 290 } 291 292 @Override 293 public String getIndexNameForType(String type) { 294 return esa.getIndexNameForType(type); 295 } 296 297 @Override 298 public String getWriteIndexName(String searchIndexName) { 299 return esa.getWriteIndexName(searchIndexName); 300 } 301 302 @Override 303 public void syncSearchAndWriteAlias(String searchIndexName) { 304 esa.syncSearchAndWriteAlias(searchIndexName); 305 } 306 307 @SuppressWarnings("deprecation") 308 @Override 309 public long getPendingWorkerCount() { 310 WorkManager wm = Framework.getService(WorkManager.class); 311 // api is deprecated for completed work 312 return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED); 313 } 314 315 @SuppressWarnings("deprecation") 316 @Override 317 public long getRunningWorkerCount() { 318 WorkManager wm = Framework.getService(WorkManager.class); 319 // api is deprecated for completed work 320 return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING); 321 } 322 323 @Override 324 public int getTotalCommandProcessed() { 325 return esa.getTotalCommandProcessed(); 326 } 327 328 @Override 329 public boolean isEmbedded() { 330 return esa.isEmbedded(); 331 } 332 333 @Override 334 public boolean useExternalVersion() { 335 return esa.useExternalVersion(); 336 } 337 338 @Override 339 public boolean isIndexingInProgress() { 340 return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0); 341 } 342 343 @Override 344 public ListenableFuture<Boolean> prepareWaitForIndexing() { 345 return waiterExecutorService.submit(() -> { 346 WorkManager wm = Framework.getService(WorkManager.class); 347 boolean completed; 348 do { 349 completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS); 350 } while (!completed); 351 return true; 352 }); 353 } 354 355 protected void initListenerThreadPool() { 356 waiterExecutorService = MoreExecutors.listeningDecorator( 357 Executors.newCachedThreadPool(new NamedThreadFactory())); 358 } 359 360 protected void shutdownListenerThreadPool() { 361 try { 362 waiterExecutorService.shutdown(); 363 } finally { 364 waiterExecutorService = null; 365 } 366 } 367 368 @Override 369 public void refresh() { 370 esa.refresh(); 371 } 372 373 @Override 374 public void refreshRepositoryIndex(String repositoryName) { 375 esa.refreshRepositoryIndex(repositoryName); 376 } 377 378 @Override 379 public void flush() { 380 esa.flush(); 381 } 382 383 @Override 384 public void flushRepositoryIndex(String repositoryName) { 385 esa.flushRepositoryIndex(repositoryName); 386 } 387 388 @Override 389 public void optimize() { 390 esa.optimize(); 391 } 392 393 @Override 394 public void optimizeRepositoryIndex(String repositoryName) { 395 esa.optimizeRepositoryIndex(repositoryName); 396 } 397 398 @Override 399 public void optimizeIndex(String indexName) { 400 esa.optimizeIndex(indexName); 401 } 402 403 @Override 404 public void indexNonRecursive(IndexingCommand cmd) { 405 indexNonRecursive(Collections.singletonList(cmd)); 406 } 407 408 // ES Indexing ============================================================= 409 410 @Override 411 public void indexNonRecursive(List<IndexingCommand> cmds) { 412 if (!isReady()) { 413 stackCommands(cmds); 414 return; 415 } 416 if (log.isDebugEnabled()) { 417 log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray())); 418 } 419 esi.indexNonRecursive(cmds); 420 } 421 422 protected void stackCommands(List<IndexingCommand> cmds) { 423 if (log.isDebugEnabled()) { 424 log.debug("Delaying indexing commands: Waiting for Index to be initialized." 425 + Arrays.toString(cmds.toArray())); 426 } 427 stackedCommands.addAll(cmds); 428 } 429 430 @Override 431 public void runIndexingWorker(List<IndexingCommand> cmds) { 432 if (!isReady()) { 433 stackCommands(cmds); 434 return; 435 } 436 runIndexingWorkerCount.incrementAndGet(); 437 try { 438 dispatchWork(cmds); 439 } finally { 440 runIndexingWorkerCount.decrementAndGet(); 441 } 442 } 443 444 /** 445 * Dispatch jobs between sync and async worker 446 */ 447 protected void dispatchWork(List<IndexingCommand> cmds) { 448 Map<String, List<IndexingCommand>> syncCommands = new HashMap<>(); 449 Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>(); 450 for (IndexingCommand cmd : cmds) { 451 if (cmd.isSync()) { 452 List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName()); 453 if (syncCmds == null) { 454 syncCmds = new ArrayList<>(); 455 } 456 syncCmds.add(cmd); 457 syncCommands.put(cmd.getRepositoryName(), syncCmds); 458 } else { 459 List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName()); 460 if (asyncCmds == null) { 461 asyncCmds = new ArrayList<>(); 462 } 463 asyncCmds.add(cmd); 464 asyncCommands.put(cmd.getRepositoryName(), asyncCmds); 465 } 466 } 467 runIndexingSyncWorker(syncCommands); 468 scheduleIndexingAsyncWorker(asyncCommands); 469 } 470 471 protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) { 472 if (asyncCommands.isEmpty()) { 473 return; 474 } 475 WorkManager wm = Framework.getService(WorkManager.class); 476 for (String repositoryName : asyncCommands.keySet()) { 477 IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName)); 478 // we are in afterCompletion don't wait for a commit 479 wm.schedule(idxWork, false); 480 } 481 } 482 483 protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) { 484 if (syncCommands.isEmpty()) { 485 return; 486 } 487 TransactionHelper.runWithoutTransaction(() -> { 488 for (String repositoryName : syncCommands.keySet()) { 489 IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName)); 490 idxWork.run(); 491 } 492 }); 493 } 494 495 @Override 496 public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) { 497 if (nxql == null || nxql.isEmpty()) { 498 throw new IllegalArgumentException("Expecting an NXQL query"); 499 } 500 ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql, syncAlias); 501 WorkManager wm = Framework.getService(WorkManager.class); 502 wm.schedule(worker); 503 } 504 505 @Override 506 public void reindexRepository(String repositoryName) { 507 esa.dropAndInitRepositoryIndex(repositoryName, false); 508 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document", true); 509 } 510 511 @Override 512 public BytesReference source(DocumentModel doc) throws IOException { 513 return esi.source(doc); 514 } 515 516 // ES Search =============================================================== 517 @Override 518 public DocumentModelList query(NxQueryBuilder queryBuilder) { 519 return ess.query(queryBuilder); 520 } 521 522 @Override 523 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 524 return ess.queryAndAggregate(queryBuilder); 525 } 526 527 @Override 528 public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) { 529 return ess.scroll(queryBuilder, keepAlive); 530 } 531 532 @Override 533 public EsScrollResult scroll(EsScrollResult scrollResult) { 534 return ess.scroll(scrollResult); 535 } 536 537 @Override 538 public void clearScroll(EsScrollResult scrollResult) { 539 ess.clearScroll(scrollResult); 540 } 541 542 @Deprecated 543 @Override 544 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) { 545 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 546 return query(query); 547 } 548 549 @Deprecated 550 @Override 551 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 552 SortInfo... sortInfos) { 553 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder) 554 .limit(limit) 555 .offset(offset) 556 .addSort(sortInfos); 557 return query(query); 558 } 559 560 // misc ==================================================================== 561 protected boolean isReady() { 562 return (esa != null) && esa.isReady(); 563 } 564 565 protected static class NamedThreadFactory implements ThreadFactory { 566 @SuppressWarnings("NullableProblems") 567 @Override 568 public Thread newThread(Runnable r) { 569 return new Thread(r, "waitForEsIndexing"); 570 } 571 } 572 573 @Override 574 public Optional<ESHintQueryBuilder> getHintByOperator(String name) { 575 return esa.getHintByOperator(name); 576 } 577}