001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020package org.nuxeo.elasticsearch; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY; 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.Executors; 035import java.util.concurrent.ThreadFactory; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.atomic.AtomicInteger; 039 040import javax.transaction.Transaction; 041 042import org.apache.commons.logging.Log; 043import org.apache.commons.logging.LogFactory; 044import org.elasticsearch.common.bytes.BytesReference; 045import org.elasticsearch.index.query.QueryBuilder; 046import org.nuxeo.ecm.core.api.CoreSession; 047import org.nuxeo.ecm.core.api.DocumentModel; 048import org.nuxeo.ecm.core.api.DocumentModelList; 049import org.nuxeo.ecm.core.api.NuxeoException; 050import org.nuxeo.ecm.core.api.SortInfo; 051import org.nuxeo.ecm.core.repository.RepositoryService; 052import org.nuxeo.ecm.core.work.api.Work; 053import org.nuxeo.ecm.core.work.api.WorkManager; 054import org.nuxeo.elasticsearch.api.ESClient; 055import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 056import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 057import org.nuxeo.elasticsearch.api.ElasticSearchService; 058import org.nuxeo.elasticsearch.api.EsResult; 059import org.nuxeo.elasticsearch.api.EsScrollResult; 060import org.nuxeo.elasticsearch.commands.IndexingCommand; 061import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig; 062import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor; 063import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig; 064import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 065import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl; 066import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl; 067import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl; 068import org.nuxeo.elasticsearch.io.JsonESDocumentWriter; 069import org.nuxeo.elasticsearch.query.NxQueryBuilder; 070import org.nuxeo.elasticsearch.work.IndexingWorker; 071import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker; 072import org.nuxeo.runtime.api.Framework; 073import org.nuxeo.runtime.model.ComponentContext; 074import org.nuxeo.runtime.model.ComponentInstance; 075import org.nuxeo.runtime.model.DefaultComponent; 076import org.nuxeo.runtime.transaction.TransactionHelper; 077 078import com.google.common.util.concurrent.ListenableFuture; 079import com.google.common.util.concurrent.ListeningExecutorService; 080import com.google.common.util.concurrent.MoreExecutors; 081 082/** 083 * Component used to configure and manage ElasticSearch integration 084 */ 085public class ElasticSearchComponent extends DefaultComponent 086 implements ElasticSearchAdmin, ElasticSearchIndexing, ElasticSearchService { 087 088 protected static final Log log = LogFactory.getLog(ElasticSearchComponent.class); 089 090 protected static final String EP_EMBEDDED_SERVER = "elasticSearchEmbeddedServer"; 091 092 protected static final String EP_CLIENT_INIT = "elasticSearchClient"; 093 094 protected static final String EP_INDEX = "elasticSearchIndex"; 095 096 protected static final String EP_DOC_WRITER = "elasticSearchDocWriter"; 097 098 protected static final long REINDEX_TIMEOUT = 20; 099 100 // Indexing commands that where received before the index initialization 101 protected final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>()); 102 103 protected final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>(); 104 105 protected final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0); 106 107 protected ElasticSearchEmbeddedServerConfig embeddedServerConfig; 108 109 protected ElasticSearchClientConfig clientConfig; 110 111 protected ElasticSearchAdminImpl esa; 112 113 protected ElasticSearchIndexingImpl esi; 114 115 protected ElasticSearchServiceImpl ess; 116 117 protected JsonESDocumentWriter jsonESDocumentWriter; 118 119 protected ListeningExecutorService waiterExecutorService; 120 121 // Nuxeo Component impl ======================================é============= 122 @Override 123 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 124 switch (extensionPoint) { 125 case EP_EMBEDDED_SERVER: 126 ElasticSearchEmbeddedServerConfig serverContrib = (ElasticSearchEmbeddedServerConfig) contribution; 127 if (serverContrib.isEnabled()) { 128 embeddedServerConfig = serverContrib; 129 log.info("Registering embedded server configuration: " + embeddedServerConfig + ", loaded from " 130 + contributor.getName()); 131 } else if (embeddedServerConfig != null) { 132 log.info("Disabling previous embedded server configuration, deactivated by " + contributor.getName()); 133 embeddedServerConfig = null; 134 } 135 break; 136 case EP_CLIENT_INIT: 137 clientConfig = (ElasticSearchClientConfig) contribution; 138 break; 139 case EP_INDEX: 140 ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution; 141 ElasticSearchIndexConfig previous = indexConfig.get(idx.getName()); 142 if (idx.isEnabled()) { 143 idx.merge(previous); 144 indexConfig.put(idx.getName(), idx); 145 log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName()); 146 } else if (previous != null) { 147 log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName()); 148 indexConfig.remove(idx.getName()); 149 } 150 break; 151 case EP_DOC_WRITER: 152 ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution; 153 try { 154 jsonESDocumentWriter = writerDescriptor.getKlass().newInstance(); 155 } catch (IllegalAccessException | InstantiationException e) { 156 log.error("Cannot instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass()); 157 throw new NuxeoException(e); 158 } 159 break; 160 default: 161 throw new IllegalStateException("Invalid EP: " + extensionPoint); 162 } 163 } 164 165 @Override 166 public void start(ComponentContext context) { 167 if (!isElasticsearchEnabled()) { 168 log.info("Elasticsearch service is disabled"); 169 return; 170 } 171 esa = new ElasticSearchAdminImpl(embeddedServerConfig, clientConfig, indexConfig); 172 esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter); 173 ess = new ElasticSearchServiceImpl(esa); 174 initListenerThreadPool(); 175 processStackedCommands(); 176 reindexOnStartup(); 177 } 178 179 @Override 180 public void stop(ComponentContext context) { 181 if (esa == null) { 182 // Elasticsearch service was disabled 183 return; 184 } 185 try { 186 shutdownListenerThreadPool(); 187 } finally { 188 try { 189 esa.disconnect(); 190 } finally { 191 esa = null; 192 esi = null; 193 ess = null; 194 } 195 } 196 } 197 198 protected void reindexOnStartup() { 199 boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false")); 200 if (!reindexOnStartup) { 201 return; 202 } 203 for (String repositoryName : esa.getInitializedRepositories()) { 204 log.warn(String.format("Indexing repository: %s on startup", repositoryName)); 205 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 206 try { 207 prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS); 208 } catch (InterruptedException e) { 209 Thread.currentThread().interrupt(); 210 } catch (ExecutionException e) { 211 log.error(e.getMessage(), e); 212 } catch (TimeoutException e) { 213 log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background", 214 repositoryName, REINDEX_TIMEOUT)); 215 } 216 } 217 } 218 219 protected boolean isElasticsearchEnabled() { 220 return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true")); 221 } 222 223 @Override 224 public int getApplicationStartedOrder() { 225 RepositoryService component = (RepositoryService) Framework.getRuntime() 226 .getComponent( 227 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 228 return component.getApplicationStartedOrder() / 2; 229 } 230 231 void processStackedCommands() { 232 if (!stackedCommands.isEmpty()) { 233 log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size())); 234 runIndexingWorker(stackedCommands); 235 stackedCommands.clear(); 236 log.debug("Done"); 237 } 238 } 239 240 // Es Admin ================================================================ 241 242 @Override 243 public ESClient getClient() { 244 return esa.getClient(); 245 } 246 247 @Override 248 public void initIndexes(boolean dropIfExists) { 249 esa.initIndexes(dropIfExists); 250 } 251 252 @Override 253 public void dropAndInitIndex(String indexName) { 254 esa.dropAndInitIndex(indexName); 255 } 256 257 @Override 258 public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) { 259 esa.dropAndInitRepositoryIndex(repositoryName, syncAlias); 260 } 261 262 @Override 263 public List<String> getRepositoryNames() { 264 return esa.getRepositoryNames(); 265 } 266 267 @Override 268 public String getIndexNameForRepository(String repositoryName) { 269 return esa.getIndexNameForRepository(repositoryName); 270 } 271 272 @Override 273 public String getRepositoryForIndex(String indexName) { 274 return esa.getRepositoryForIndex(indexName); 275 } 276 277 @Override 278 public List<String> getIndexNamesForType(String type) { 279 return esa.getIndexNamesForType(type); 280 } 281 282 @Override 283 public String getIndexNameForType(String type) { 284 return esa.getIndexNameForType(type); 285 } 286 287 @Override 288 public String getWriteIndexName(String searchIndexName) { 289 return esa.getWriteIndexName(searchIndexName); 290 } 291 292 @Override 293 public void syncSearchAndWriteAlias(String searchIndexName) { 294 esa.syncSearchAndWriteAlias(searchIndexName); 295 } 296 297 @SuppressWarnings("deprecation") 298 @Override 299 public long getPendingWorkerCount() { 300 WorkManager wm = Framework.getService(WorkManager.class); 301 // api is deprecated for completed work 302 return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED); 303 } 304 305 @SuppressWarnings("deprecation") 306 @Override 307 public long getRunningWorkerCount() { 308 WorkManager wm = Framework.getService(WorkManager.class); 309 // api is deprecated for completed work 310 return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING); 311 } 312 313 @Override 314 public int getTotalCommandProcessed() { 315 return esa.getTotalCommandProcessed(); 316 } 317 318 @Override 319 public boolean isEmbedded() { 320 return esa.isEmbedded(); 321 } 322 323 @Override 324 public boolean useExternalVersion() { 325 return esa.useExternalVersion(); 326 } 327 328 @Override 329 public boolean isIndexingInProgress() { 330 return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0); 331 } 332 333 @Override 334 public ListenableFuture<Boolean> prepareWaitForIndexing() { 335 return waiterExecutorService.submit(() -> { 336 WorkManager wm = Framework.getService(WorkManager.class); 337 boolean completed; 338 do { 339 completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS); 340 } while (!completed); 341 return true; 342 }); 343 } 344 345 protected void initListenerThreadPool() { 346 waiterExecutorService = MoreExecutors.listeningDecorator( 347 Executors.newCachedThreadPool(new NamedThreadFactory())); 348 } 349 350 protected void shutdownListenerThreadPool() { 351 try { 352 waiterExecutorService.shutdown(); 353 } finally { 354 waiterExecutorService = null; 355 } 356 } 357 358 @Override 359 public void refresh() { 360 esa.refresh(); 361 } 362 363 @Override 364 public void refreshRepositoryIndex(String repositoryName) { 365 esa.refreshRepositoryIndex(repositoryName); 366 } 367 368 @Override 369 public void flush() { 370 esa.flush(); 371 } 372 373 @Override 374 public void flushRepositoryIndex(String repositoryName) { 375 esa.flushRepositoryIndex(repositoryName); 376 } 377 378 @Override 379 public void optimize() { 380 esa.optimize(); 381 } 382 383 @Override 384 public void optimizeRepositoryIndex(String repositoryName) { 385 esa.optimizeRepositoryIndex(repositoryName); 386 } 387 388 @Override 389 public void optimizeIndex(String indexName) { 390 esa.optimizeIndex(indexName); 391 } 392 393 @Override 394 public void indexNonRecursive(IndexingCommand cmd) { 395 indexNonRecursive(Collections.singletonList(cmd)); 396 } 397 398 // ES Indexing ============================================================= 399 400 @Override 401 public void indexNonRecursive(List<IndexingCommand> cmds) { 402 if (!isReady()) { 403 stackCommands(cmds); 404 return; 405 } 406 if (log.isDebugEnabled()) { 407 log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray())); 408 } 409 esi.indexNonRecursive(cmds); 410 } 411 412 protected void stackCommands(List<IndexingCommand> cmds) { 413 if (log.isDebugEnabled()) { 414 log.debug("Delaying indexing commands: Waiting for Index to be initialized." 415 + Arrays.toString(cmds.toArray())); 416 } 417 stackedCommands.addAll(cmds); 418 } 419 420 @Override 421 public void runIndexingWorker(List<IndexingCommand> cmds) { 422 if (!isReady()) { 423 stackCommands(cmds); 424 return; 425 } 426 runIndexingWorkerCount.incrementAndGet(); 427 try { 428 dispatchWork(cmds); 429 } finally { 430 runIndexingWorkerCount.decrementAndGet(); 431 } 432 } 433 434 /** 435 * Dispatch jobs between sync and async worker 436 */ 437 protected void dispatchWork(List<IndexingCommand> cmds) { 438 Map<String, List<IndexingCommand>> syncCommands = new HashMap<>(); 439 Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>(); 440 for (IndexingCommand cmd : cmds) { 441 if (cmd.isSync()) { 442 List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName()); 443 if (syncCmds == null) { 444 syncCmds = new ArrayList<>(); 445 } 446 syncCmds.add(cmd); 447 syncCommands.put(cmd.getRepositoryName(), syncCmds); 448 } else { 449 List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName()); 450 if (asyncCmds == null) { 451 asyncCmds = new ArrayList<>(); 452 } 453 asyncCmds.add(cmd); 454 asyncCommands.put(cmd.getRepositoryName(), asyncCmds); 455 } 456 } 457 runIndexingSyncWorker(syncCommands); 458 scheduleIndexingAsyncWorker(asyncCommands); 459 } 460 461 protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) { 462 if (asyncCommands.isEmpty()) { 463 return; 464 } 465 WorkManager wm = Framework.getService(WorkManager.class); 466 for (String repositoryName : asyncCommands.keySet()) { 467 IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName)); 468 // we are in afterCompletion don't wait for a commit 469 wm.schedule(idxWork, false); 470 } 471 } 472 473 protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) { 474 if (syncCommands.isEmpty()) { 475 return; 476 } 477 Transaction transaction = TransactionHelper.suspendTransaction(); 478 try { 479 for (String repositoryName : syncCommands.keySet()) { 480 IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName)); 481 idxWork.run(); 482 } 483 } finally { 484 if (transaction != null) { 485 TransactionHelper.resumeTransaction(transaction); 486 } 487 488 } 489 } 490 491 @Override 492 public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) { 493 if (nxql == null || nxql.isEmpty()) { 494 throw new IllegalArgumentException("Expecting an NXQL query"); 495 } 496 ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql, syncAlias); 497 WorkManager wm = Framework.getService(WorkManager.class); 498 wm.schedule(worker); 499 } 500 501 @Override 502 public void reindexRepository(String repositoryName) { 503 esa.dropAndInitRepositoryIndex(repositoryName, false); 504 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document", true); 505 } 506 507 @Override 508 public BytesReference source(DocumentModel doc) throws IOException { 509 return esi.source(doc); 510 } 511 512 // ES Search =============================================================== 513 @Override 514 public DocumentModelList query(NxQueryBuilder queryBuilder) { 515 return ess.query(queryBuilder); 516 } 517 518 @Override 519 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 520 return ess.queryAndAggregate(queryBuilder); 521 } 522 523 @Override 524 public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) { 525 return ess.scroll(queryBuilder, keepAlive); 526 } 527 528 @Override 529 public EsScrollResult scroll(EsScrollResult scrollResult) { 530 return ess.scroll(scrollResult); 531 } 532 533 @Override 534 public void clearScroll(EsScrollResult scrollResult) { 535 ess.clearScroll(scrollResult); 536 } 537 538 @Deprecated 539 @Override 540 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) { 541 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 542 return query(query); 543 } 544 545 @Deprecated 546 @Override 547 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 548 SortInfo... sortInfos) { 549 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder) 550 .limit(limit) 551 .offset(offset) 552 .addSort(sortInfos); 553 return query(query); 554 } 555 556 // misc ==================================================================== 557 protected boolean isReady() { 558 return (esa != null) && esa.isReady(); 559 } 560 561 protected static class NamedThreadFactory implements ThreadFactory { 562 @SuppressWarnings("NullableProblems") 563 @Override 564 public Thread newThread(Runnable r) { 565 return new Thread(r, "waitForEsIndexing"); 566 } 567 } 568 569}