001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020package org.nuxeo.elasticsearch; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY; 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY; 025 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.Executors; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.atomic.AtomicInteger; 038 039import javax.transaction.Transaction; 040 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.elasticsearch.index.query.QueryBuilder; 044import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 045import org.nuxeo.ecm.core.api.CoreSession; 046import org.nuxeo.ecm.core.api.DocumentModelList; 047import org.nuxeo.ecm.core.api.NuxeoException; 048import org.nuxeo.ecm.core.api.SortInfo; 049import org.nuxeo.ecm.core.repository.RepositoryService; 050import org.nuxeo.ecm.core.work.api.Work; 051import org.nuxeo.ecm.core.work.api.WorkManager; 052import org.nuxeo.elasticsearch.api.ESClient; 053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 054import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 055import org.nuxeo.elasticsearch.api.ElasticSearchService; 056import org.nuxeo.elasticsearch.api.EsResult; 057import org.nuxeo.elasticsearch.api.EsScrollResult; 058import org.nuxeo.elasticsearch.commands.IndexingCommand; 059import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig; 060import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor; 061import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig; 062import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 063import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl; 064import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl; 065import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl; 066import org.nuxeo.elasticsearch.query.NxQueryBuilder; 067import org.nuxeo.elasticsearch.work.IndexingWorker; 068import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker; 069import org.nuxeo.runtime.api.Framework; 070import org.nuxeo.runtime.model.ComponentContext; 071import org.nuxeo.runtime.model.ComponentInstance; 072import org.nuxeo.runtime.model.DefaultComponent; 073import org.nuxeo.runtime.transaction.TransactionHelper; 074 075import com.google.common.util.concurrent.ListenableFuture; 076import com.google.common.util.concurrent.ListeningExecutorService; 077import com.google.common.util.concurrent.MoreExecutors; 078 079/** 080 * Component used to configure and manage ElasticSearch integration 081 */ 082public class ElasticSearchComponent extends DefaultComponent 083 implements ElasticSearchAdmin, ElasticSearchIndexing, ElasticSearchService { 084 085 protected static final Log log = LogFactory.getLog(ElasticSearchComponent.class); 086 087 protected static final String EP_EMBEDDED_SERVER = "elasticSearchEmbeddedServer"; 088 089 protected static final String EP_CLIENT_INIT = "elasticSearchClient"; 090 091 protected static final String EP_INDEX = "elasticSearchIndex"; 092 093 protected static final String EP_DOC_WRITER = "elasticSearchDocWriter"; 094 095 protected static final long REINDEX_TIMEOUT = 20; 096 097 // Indexing commands that where received before the index initialization 098 protected final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>()); 099 100 protected final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>(); 101 102 protected final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0); 103 104 protected ElasticSearchEmbeddedServerConfig embeddedServerConfig; 105 106 protected ElasticSearchClientConfig clientConfig; 107 108 protected ElasticSearchAdminImpl esa; 109 110 protected ElasticSearchIndexingImpl esi; 111 112 protected ElasticSearchServiceImpl ess; 113 114 protected JsonESDocumentWriter jsonESDocumentWriter; 115 116 protected ListeningExecutorService waiterExecutorService; 117 118 // Nuxeo Component impl ======================================é============= 119 @Override 120 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 121 switch (extensionPoint) { 122 case EP_EMBEDDED_SERVER: 123 ElasticSearchEmbeddedServerConfig serverContrib = (ElasticSearchEmbeddedServerConfig) contribution; 124 if (serverContrib.isEnabled()) { 125 embeddedServerConfig = serverContrib; 126 log.info("Registering embedded server configuration: " + embeddedServerConfig + ", loaded from " 127 + contributor.getName()); 128 } else if (embeddedServerConfig != null) { 129 log.info("Disabling previous embedded server configuration, deactivated by " + contributor.getName()); 130 embeddedServerConfig = null; 131 } 132 break; 133 case EP_CLIENT_INIT: 134 clientConfig = (ElasticSearchClientConfig) contribution; 135 break; 136 case EP_INDEX: 137 ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution; 138 ElasticSearchIndexConfig previous = indexConfig.get(idx.getName()); 139 if (idx.isEnabled()) { 140 idx.merge(previous); 141 indexConfig.put(idx.getName(), idx); 142 log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName()); 143 } else if (previous != null) { 144 log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName()); 145 indexConfig.remove(idx.getName()); 146 } 147 break; 148 case EP_DOC_WRITER: 149 ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution; 150 try { 151 jsonESDocumentWriter = writerDescriptor.getKlass().newInstance(); 152 } catch (IllegalAccessException | InstantiationException e) { 153 log.error("Cannot instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass()); 154 throw new NuxeoException(e); 155 } 156 break; 157 default: 158 throw new IllegalStateException("Invalid EP: " + extensionPoint); 159 } 160 } 161 162 @Override 163 public void start(ComponentContext context) { 164 if (!isElasticsearchEnabled()) { 165 log.info("Elasticsearch service is disabled"); 166 return; 167 } 168 esa = new ElasticSearchAdminImpl(embeddedServerConfig, clientConfig, indexConfig); 169 esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter); 170 ess = new ElasticSearchServiceImpl(esa); 171 initListenerThreadPool(); 172 processStackedCommands(); 173 reindexOnStartup(); 174 } 175 176 @Override 177 public void stop(ComponentContext context) { 178 if (esa == null) { 179 // Elasticsearch service was disabled 180 return; 181 } 182 try { 183 shutdownListenerThreadPool(); 184 } finally { 185 try { 186 esa.disconnect(); 187 } finally { 188 esa = null; 189 esi = null; 190 ess = null; 191 } 192 } 193 } 194 195 protected void reindexOnStartup() { 196 boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false")); 197 if (!reindexOnStartup) { 198 return; 199 } 200 for (String repositoryName : esa.getInitializedRepositories()) { 201 log.warn(String.format("Indexing repository: %s on startup", repositoryName)); 202 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 203 try { 204 prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS); 205 } catch (InterruptedException e) { 206 Thread.currentThread().interrupt(); 207 } catch (ExecutionException e) { 208 log.error(e.getMessage(), e); 209 } catch (TimeoutException e) { 210 log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background", 211 repositoryName, REINDEX_TIMEOUT)); 212 } 213 } 214 } 215 216 protected boolean isElasticsearchEnabled() { 217 return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true")); 218 } 219 220 @Override 221 public int getApplicationStartedOrder() { 222 RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent( 223 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 224 return component.getApplicationStartedOrder() / 2; 225 } 226 227 void processStackedCommands() { 228 if (!stackedCommands.isEmpty()) { 229 log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size())); 230 runIndexingWorker(stackedCommands); 231 stackedCommands.clear(); 232 log.debug("Done"); 233 } 234 } 235 236 // Es Admin ================================================================ 237 238 @Override 239 public ESClient getClient() { 240 return esa.getClient(); 241 } 242 243 @Override 244 public void initIndexes(boolean dropIfExists) { 245 esa.initIndexes(dropIfExists); 246 } 247 248 @Override 249 public void dropAndInitIndex(String indexName) { 250 esa.dropAndInitIndex(indexName); 251 } 252 253 @Override 254 public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) { 255 esa.dropAndInitRepositoryIndex(repositoryName, syncAlias); 256 } 257 258 @Override 259 public List<String> getRepositoryNames() { 260 return esa.getRepositoryNames(); 261 } 262 263 @Override 264 public String getIndexNameForRepository(String repositoryName) { 265 return esa.getIndexNameForRepository(repositoryName); 266 } 267 268 @Override 269 public String getRepositoryForIndex(String indexName) { 270 return esa.getRepositoryForIndex(indexName); 271 } 272 273 @Override 274 public List<String> getIndexNamesForType(String type) { 275 return esa.getIndexNamesForType(type); 276 } 277 278 @Override 279 public String getIndexNameForType(String type) { 280 return esa.getIndexNameForType(type); 281 } 282 283 @Override 284 public String getWriteIndexName(String searchIndexName) { 285 return esa.getWriteIndexName(searchIndexName); 286 } 287 288 @Override 289 public void syncSearchAndWriteAlias(String searchIndexName) { 290 esa.syncSearchAndWriteAlias(searchIndexName); 291 } 292 293 @SuppressWarnings("deprecation") 294 @Override 295 public long getPendingWorkerCount() { 296 WorkManager wm = Framework.getService(WorkManager.class); 297 // api is deprecated for completed work 298 return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED); 299 } 300 301 @SuppressWarnings("deprecation") 302 @Override 303 public long getRunningWorkerCount() { 304 WorkManager wm = Framework.getService(WorkManager.class); 305 // api is deprecated for completed work 306 return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING); 307 } 308 309 @Override 310 public int getTotalCommandProcessed() { 311 return esa.getTotalCommandProcessed(); 312 } 313 314 @Override 315 public boolean isEmbedded() { 316 return esa.isEmbedded(); 317 } 318 319 @Override 320 public boolean useExternalVersion() { 321 return esa.useExternalVersion(); 322 } 323 324 @Override 325 public boolean isIndexingInProgress() { 326 return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0); 327 } 328 329 @Override 330 public ListenableFuture<Boolean> prepareWaitForIndexing() { 331 return waiterExecutorService.submit(() -> { 332 WorkManager wm = Framework.getService(WorkManager.class); 333 boolean completed; 334 do { 335 completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS); 336 } while (!completed); 337 return true; 338 }); 339 } 340 341 protected void initListenerThreadPool() { 342 waiterExecutorService = MoreExecutors.listeningDecorator( 343 Executors.newCachedThreadPool(new NamedThreadFactory())); 344 } 345 346 protected void shutdownListenerThreadPool() { 347 try { 348 waiterExecutorService.shutdown(); 349 } finally { 350 waiterExecutorService = null; 351 } 352 } 353 354 @Override 355 public void refresh() { 356 esa.refresh(); 357 } 358 359 @Override 360 public void refreshRepositoryIndex(String repositoryName) { 361 esa.refreshRepositoryIndex(repositoryName); 362 } 363 364 @Override 365 public void flush() { 366 esa.flush(); 367 } 368 369 @Override 370 public void flushRepositoryIndex(String repositoryName) { 371 esa.flushRepositoryIndex(repositoryName); 372 } 373 374 @Override 375 public void optimize() { 376 esa.optimize(); 377 } 378 379 @Override 380 public void optimizeRepositoryIndex(String repositoryName) { 381 esa.optimizeRepositoryIndex(repositoryName); 382 } 383 384 @Override 385 public void optimizeIndex(String indexName) { 386 esa.optimizeIndex(indexName); 387 } 388 389 @Override 390 public void indexNonRecursive(IndexingCommand cmd) { 391 indexNonRecursive(Collections.singletonList(cmd)); 392 } 393 394 // ES Indexing ============================================================= 395 396 @Override 397 public void indexNonRecursive(List<IndexingCommand> cmds) { 398 if (!isReady()) { 399 stackCommands(cmds); 400 return; 401 } 402 if (log.isDebugEnabled()) { 403 log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray())); 404 } 405 esi.indexNonRecursive(cmds); 406 } 407 408 protected void stackCommands(List<IndexingCommand> cmds) { 409 if (log.isDebugEnabled()) { 410 log.debug("Delaying indexing commands: Waiting for Index to be initialized." 411 + Arrays.toString(cmds.toArray())); 412 } 413 stackedCommands.addAll(cmds); 414 } 415 416 @Override 417 public void runIndexingWorker(List<IndexingCommand> cmds) { 418 if (!isReady()) { 419 stackCommands(cmds); 420 return; 421 } 422 runIndexingWorkerCount.incrementAndGet(); 423 try { 424 dispatchWork(cmds); 425 } finally { 426 runIndexingWorkerCount.decrementAndGet(); 427 } 428 } 429 430 /** 431 * Dispatch jobs between sync and async worker 432 */ 433 protected void dispatchWork(List<IndexingCommand> cmds) { 434 Map<String, List<IndexingCommand>> syncCommands = new HashMap<>(); 435 Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>(); 436 for (IndexingCommand cmd : cmds) { 437 if (cmd.isSync()) { 438 List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName()); 439 if (syncCmds == null) { 440 syncCmds = new ArrayList<>(); 441 } 442 syncCmds.add(cmd); 443 syncCommands.put(cmd.getRepositoryName(), syncCmds); 444 } else { 445 List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName()); 446 if (asyncCmds == null) { 447 asyncCmds = new ArrayList<>(); 448 } 449 asyncCmds.add(cmd); 450 asyncCommands.put(cmd.getRepositoryName(), asyncCmds); 451 } 452 } 453 runIndexingSyncWorker(syncCommands); 454 scheduleIndexingAsyncWorker(asyncCommands); 455 } 456 457 protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) { 458 if (asyncCommands.isEmpty()) { 459 return; 460 } 461 WorkManager wm = Framework.getService(WorkManager.class); 462 for (String repositoryName : asyncCommands.keySet()) { 463 IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName)); 464 // we are in afterCompletion don't wait for a commit 465 wm.schedule(idxWork, false); 466 } 467 } 468 469 protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) { 470 if (syncCommands.isEmpty()) { 471 return; 472 } 473 Transaction transaction = TransactionHelper.suspendTransaction(); 474 try { 475 for (String repositoryName : syncCommands.keySet()) { 476 IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName)); 477 idxWork.run(); 478 } 479 } finally { 480 if (transaction != null) { 481 TransactionHelper.resumeTransaction(transaction); 482 } 483 484 } 485 } 486 487 @Override 488 public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) { 489 if (nxql == null || nxql.isEmpty()) { 490 throw new IllegalArgumentException("Expecting an NXQL query"); 491 } 492 ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql, syncAlias); 493 WorkManager wm = Framework.getService(WorkManager.class); 494 wm.schedule(worker); 495 } 496 497 @Override 498 public void reindexRepository(String repositoryName) { 499 esa.dropAndInitRepositoryIndex(repositoryName, false); 500 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document", true); 501 } 502 503 // ES Search =============================================================== 504 @Override 505 public DocumentModelList query(NxQueryBuilder queryBuilder) { 506 return ess.query(queryBuilder); 507 } 508 509 @Override 510 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 511 return ess.queryAndAggregate(queryBuilder); 512 } 513 514 @Override 515 public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) { 516 return ess.scroll(queryBuilder, keepAlive); 517 } 518 519 @Override 520 public EsScrollResult scroll(EsScrollResult scrollResult) { 521 return ess.scroll(scrollResult); 522 } 523 524 @Override 525 public void clearScroll(EsScrollResult scrollResult) { 526 ess.clearScroll(scrollResult); 527 } 528 529 @Deprecated 530 @Override 531 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) { 532 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 533 return query(query); 534 } 535 536 @Deprecated 537 @Override 538 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 539 SortInfo... sortInfos) { 540 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort( 541 sortInfos); 542 return query(query); 543 } 544 545 // misc ==================================================================== 546 protected boolean isReady() { 547 return (esa != null) && esa.isReady(); 548 } 549 550 protected static class NamedThreadFactory implements ThreadFactory { 551 @SuppressWarnings("NullableProblems") 552 @Override 553 public Thread newThread(Runnable r) { 554 return new Thread(r, "waitForEsIndexing"); 555 } 556 } 557 558}