001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020package org.nuxeo.elasticsearch; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY; 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY; 025 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.Executors; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.atomic.AtomicInteger; 038 039import javax.transaction.Transaction; 040 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.elasticsearch.client.Client; 044import org.elasticsearch.index.query.QueryBuilder; 045import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 046import org.nuxeo.ecm.core.api.CoreSession; 047import org.nuxeo.ecm.core.api.DocumentModelList; 048import org.nuxeo.ecm.core.api.SortInfo; 049import org.nuxeo.ecm.core.repository.RepositoryService; 050import org.nuxeo.ecm.core.work.api.Work; 051import org.nuxeo.ecm.core.work.api.WorkManager; 052import org.nuxeo.elasticsearch.api.ESClientInitializationService; 053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 054import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 055import org.nuxeo.elasticsearch.api.ElasticSearchService; 056import org.nuxeo.elasticsearch.api.EsResult; 057import org.nuxeo.elasticsearch.api.EsScrollResult; 058import org.nuxeo.elasticsearch.commands.IndexingCommand; 059import org.nuxeo.elasticsearch.config.ESClientInitializationDescriptor; 060import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor; 061import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 062import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig; 063import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig; 064import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl; 065import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl; 066import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl; 067import org.nuxeo.elasticsearch.query.NxQueryBuilder; 068import org.nuxeo.elasticsearch.work.IndexingWorker; 069import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker; 070import org.nuxeo.runtime.api.Framework; 071import org.nuxeo.runtime.model.ComponentContext; 072import org.nuxeo.runtime.model.ComponentInstance; 073import org.nuxeo.runtime.model.DefaultComponent; 074import org.nuxeo.runtime.transaction.TransactionHelper; 075 076import com.google.common.util.concurrent.ListenableFuture; 077import com.google.common.util.concurrent.ListeningExecutorService; 078import com.google.common.util.concurrent.MoreExecutors; 079 080/** 081 * Component used to configure and manage ElasticSearch integration 082 */ 083public class ElasticSearchComponent extends DefaultComponent 084 implements ElasticSearchAdmin, ElasticSearchIndexing, ElasticSearchService { 085 086 private static final Log log = LogFactory.getLog(ElasticSearchComponent.class); 087 088 private static final String EP_REMOTE = "elasticSearchRemote"; 089 090 private static final String EP_LOCAL = "elasticSearchLocal"; 091 092 private static final String EP_INDEX = "elasticSearchIndex"; 093 094 private static final String EP_DOC_WRITER = "elasticSearchDocWriter"; 095 096 private static final String EP_CLIENT_INIT = "elasticSearchClientInitialization"; 097 098 private static final long REINDEX_TIMEOUT = 20; 099 100 // Indexing commands that where received before the index initialization 101 private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>()); 102 103 private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>(); 104 105 private ElasticSearchLocalConfig localConfig; 106 107 private ElasticSearchRemoteConfig remoteConfig; 108 109 private ElasticSearchAdminImpl esa; 110 111 private ElasticSearchIndexingImpl esi; 112 113 private ElasticSearchServiceImpl ess; 114 115 protected JsonESDocumentWriter jsonESDocumentWriter; 116 117 protected ESClientInitializationService clientInitService; 118 119 private ListeningExecutorService waiterExecutorService; 120 121 private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0); 122 123 // Nuxeo Component impl ======================================é============= 124 @Override 125 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 126 switch (extensionPoint) { 127 case EP_LOCAL: 128 ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution; 129 if (localContrib.isEnabled()) { 130 localConfig = localContrib; 131 remoteConfig = null; 132 log.info("Registering local embedded configuration: " + localConfig + ", loaded from " 133 + contributor.getName()); 134 } else if (localConfig != null) { 135 log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName()); 136 localConfig = null; 137 } 138 break; 139 case EP_REMOTE: 140 ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution; 141 if (remoteContribution.isEnabled()) { 142 remoteConfig = remoteContribution; 143 localConfig = null; 144 log.info( 145 "Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName()); 146 } else if (remoteConfig != null) { 147 log.info("Disabling previous remote configuration, deactivated by " + contributor.getName()); 148 remoteConfig = null; 149 } 150 break; 151 case EP_INDEX: 152 ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution; 153 ElasticSearchIndexConfig previous = indexConfig.get(idx.getName()); 154 if (idx.isEnabled()) { 155 idx.merge(previous); 156 indexConfig.put(idx.getName(), idx); 157 log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName()); 158 } else if (previous != null) { 159 log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName()); 160 indexConfig.remove(idx.getName()); 161 } 162 break; 163 case EP_DOC_WRITER: 164 ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution; 165 try { 166 jsonESDocumentWriter = writerDescriptor.getKlass().newInstance(); 167 } catch (IllegalAccessException | InstantiationException e) { 168 log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass()); 169 throw new RuntimeException(e); 170 } 171 break; 172 case EP_CLIENT_INIT: 173 ESClientInitializationDescriptor clientInitDescriptor = (ESClientInitializationDescriptor) contribution; 174 try { 175 clientInitService = clientInitDescriptor.getKlass().newInstance(); 176 clientInitService.setUsername(clientInitDescriptor.getUsername()); 177 clientInitService.setPassword(clientInitDescriptor.getPassword()); 178 clientInitService.setSslKeystorePath(clientInitDescriptor.getSslKeystorePath()); 179 clientInitService.setSslKeystorePassword(clientInitDescriptor.getSslKeystorePassword()); 180 } catch (IllegalAccessException | InstantiationException e) { 181 log.error( 182 "Can not instantiate ES Client initialization service from " + clientInitDescriptor.getKlass()); 183 throw new RuntimeException(e); 184 } 185 break; 186 default: 187 throw new IllegalStateException("Invalid EP: " + extensionPoint); 188 } 189 } 190 191 @Override 192 public void start(ComponentContext context) { 193 if (!isElasticsearchEnabled()) { 194 log.info("Elasticsearch service is disabled"); 195 return; 196 } 197 esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig, clientInitService); 198 esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter); 199 ess = new ElasticSearchServiceImpl(esa); 200 initListenerThreadPool(); 201 processStackedCommands(); 202 reindexOnStartup(); 203 } 204 205 @Override 206 public void stop(ComponentContext context) { 207 try { 208 shutdownListenerThreadPool(); 209 } finally { 210 try { 211 esa.disconnect(); 212 } finally { 213 esa = null; 214 esi = null; 215 ess = null; 216 } 217 } 218 } 219 220 private void reindexOnStartup() { 221 boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false")); 222 if (!reindexOnStartup) { 223 return; 224 } 225 for (String repositoryName : esa.getInitializedRepositories()) { 226 log.warn(String.format("Indexing repository: %s on startup", repositoryName)); 227 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 228 try { 229 prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS); 230 } catch (InterruptedException e) { 231 Thread.currentThread().interrupt(); 232 } catch (ExecutionException e) { 233 log.error(e.getMessage(), e); 234 } catch (TimeoutException e) { 235 log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background", 236 repositoryName, REINDEX_TIMEOUT)); 237 } 238 } 239 } 240 241 protected boolean isElasticsearchEnabled() { 242 return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true")); 243 } 244 245 @Override 246 public int getApplicationStartedOrder() { 247 RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent( 248 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 249 return component.getApplicationStartedOrder() / 2; 250 } 251 252 void processStackedCommands() { 253 if (!stackedCommands.isEmpty()) { 254 log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size())); 255 runIndexingWorker(stackedCommands); 256 stackedCommands.clear(); 257 log.debug("Done"); 258 } 259 } 260 261 // Es Admin ================================================================ 262 263 @Override 264 public Client getClient() { 265 return esa.getClient(); 266 } 267 268 @Override 269 public void initIndexes(boolean dropIfExists) { 270 esa.initIndexes(dropIfExists); 271 } 272 273 @Override 274 public void dropAndInitIndex(String indexName) { 275 esa.dropAndInitIndex(indexName); 276 } 277 278 @Override 279 public void dropAndInitRepositoryIndex(String repositoryName) { 280 esa.dropAndInitRepositoryIndex(repositoryName); 281 } 282 283 @Override 284 public List<String> getRepositoryNames() { 285 return esa.getRepositoryNames(); 286 } 287 288 @Override 289 public String getIndexNameForRepository(String repositoryName) { 290 return esa.getIndexNameForRepository(repositoryName); 291 } 292 293 @Override 294 public List<String> getIndexNamesForType(String type) { 295 return esa.getIndexNamesForType(type); 296 } 297 298 @Override 299 public String getIndexNameForType(String type) { 300 return esa.getIndexNameForType(type); 301 } 302 303 @SuppressWarnings("deprecation") 304 @Override 305 public long getPendingWorkerCount() { 306 WorkManager wm = Framework.getLocalService(WorkManager.class); 307 // api is deprecated for completed work 308 return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED); 309 } 310 311 @SuppressWarnings("deprecation") 312 @Override 313 public long getRunningWorkerCount() { 314 WorkManager wm = Framework.getLocalService(WorkManager.class); 315 // api is deprecated for completed work 316 return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING); 317 } 318 319 @Override 320 public int getTotalCommandProcessed() { 321 return esa.getTotalCommandProcessed(); 322 } 323 324 @Override 325 public boolean isEmbedded() { 326 return esa.isEmbedded(); 327 } 328 329 @Override 330 public boolean useExternalVersion() { 331 return esa.useExternalVersion(); 332 } 333 334 @Override 335 public boolean isIndexingInProgress() { 336 return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0); 337 } 338 339 @Override 340 public ListenableFuture<Boolean> prepareWaitForIndexing() { 341 return waiterExecutorService.submit(() -> { 342 WorkManager wm = Framework.getLocalService(WorkManager.class); 343 boolean completed = false; 344 do { 345 completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS); 346 } while (!completed); 347 return true; 348 }); 349 } 350 351 private static class NamedThreadFactory implements ThreadFactory { 352 @SuppressWarnings("NullableProblems") 353 @Override 354 public Thread newThread(Runnable r) { 355 return new Thread(r, "waitForEsIndexing"); 356 } 357 } 358 359 protected void initListenerThreadPool() { 360 waiterExecutorService = MoreExecutors.listeningDecorator( 361 Executors.newCachedThreadPool(new NamedThreadFactory())); 362 } 363 364 protected void shutdownListenerThreadPool() { 365 try { 366 waiterExecutorService.shutdown(); 367 } finally { 368 waiterExecutorService = null; 369 } 370 } 371 372 @Override 373 public void refresh() { 374 esa.refresh(); 375 } 376 377 @Override 378 public void refreshRepositoryIndex(String repositoryName) { 379 esa.refreshRepositoryIndex(repositoryName); 380 } 381 382 @Override 383 public void flush() { 384 esa.flush(); 385 } 386 387 @Override 388 public void flushRepositoryIndex(String repositoryName) { 389 esa.flushRepositoryIndex(repositoryName); 390 } 391 392 @Override 393 public void optimize() { 394 esa.optimize(); 395 } 396 397 @Override 398 public void optimizeRepositoryIndex(String repositoryName) { 399 esa.optimizeRepositoryIndex(repositoryName); 400 } 401 402 @Override 403 public void optimizeIndex(String indexName) { 404 esa.optimizeIndex(indexName); 405 } 406 407 // ES Indexing ============================================================= 408 409 @Override 410 public void indexNonRecursive(IndexingCommand cmd) { 411 indexNonRecursive(Collections.singletonList(cmd)); 412 } 413 414 @Override 415 public void indexNonRecursive(List<IndexingCommand> cmds) { 416 if (!isReady()) { 417 stackCommands(cmds); 418 return; 419 } 420 if (log.isDebugEnabled()) { 421 log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray())); 422 } 423 esi.indexNonRecursive(cmds); 424 } 425 426 protected void stackCommands(List<IndexingCommand> cmds) { 427 if (log.isDebugEnabled()) { 428 log.debug("Delaying indexing commands: Waiting for Index to be initialized." 429 + Arrays.toString(cmds.toArray())); 430 } 431 stackedCommands.addAll(cmds); 432 } 433 434 @Override 435 public void runIndexingWorker(List<IndexingCommand> cmds) { 436 if (!isReady()) { 437 stackCommands(cmds); 438 return; 439 } 440 runIndexingWorkerCount.incrementAndGet(); 441 try { 442 dispatchWork(cmds); 443 } finally { 444 runIndexingWorkerCount.decrementAndGet(); 445 } 446 } 447 448 /** 449 * Dispatch jobs between sync and async worker 450 */ 451 protected void dispatchWork(List<IndexingCommand> cmds) { 452 Map<String, List<IndexingCommand>> syncCommands = new HashMap<>(); 453 Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>(); 454 for (IndexingCommand cmd : cmds) { 455 if (cmd.isSync()) { 456 List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName()); 457 if (syncCmds == null) { 458 syncCmds = new ArrayList<>(); 459 } 460 syncCmds.add(cmd); 461 syncCommands.put(cmd.getRepositoryName(), syncCmds); 462 } else { 463 List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName()); 464 if (asyncCmds == null) { 465 asyncCmds = new ArrayList<>(); 466 } 467 asyncCmds.add(cmd); 468 asyncCommands.put(cmd.getRepositoryName(), asyncCmds); 469 } 470 } 471 runIndexingSyncWorker(syncCommands); 472 scheduleIndexingAsyncWorker(asyncCommands); 473 } 474 475 protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) { 476 if (asyncCommands.isEmpty()) { 477 return; 478 } 479 WorkManager wm = Framework.getLocalService(WorkManager.class); 480 for (String repositoryName : asyncCommands.keySet()) { 481 IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName)); 482 // we are in afterCompletion don't wait for a commit 483 wm.schedule(idxWork, false); 484 } 485 } 486 487 protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) { 488 if (syncCommands.isEmpty()) { 489 return; 490 } 491 Transaction transaction = TransactionHelper.suspendTransaction(); 492 try { 493 for (String repositoryName : syncCommands.keySet()) { 494 IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName)); 495 idxWork.run(); 496 } 497 } finally { 498 if (transaction != null) { 499 TransactionHelper.resumeTransaction(transaction); 500 } 501 502 } 503 } 504 505 @Override 506 public void runReindexingWorker(String repositoryName, String nxql) { 507 if (nxql == null || nxql.isEmpty()) { 508 throw new IllegalArgumentException("Expecting an NXQL query"); 509 } 510 ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql); 511 WorkManager wm = Framework.getLocalService(WorkManager.class); 512 wm.schedule(worker); 513 } 514 515 // ES Search =============================================================== 516 @Override 517 public DocumentModelList query(NxQueryBuilder queryBuilder) { 518 return ess.query(queryBuilder); 519 } 520 521 @Override 522 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 523 return ess.queryAndAggregate(queryBuilder); 524 } 525 526 @Override 527 public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) { 528 return ess.scroll(queryBuilder, keepAlive); 529 } 530 531 @Override 532 public EsScrollResult scroll(EsScrollResult scrollResult) { 533 return ess.scroll(scrollResult); 534 } 535 536 @Override 537 public void clearScroll(EsScrollResult scrollResult) { 538 ess.clearScroll(scrollResult); 539 } 540 541 @Deprecated 542 @Override 543 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) { 544 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 545 return query(query); 546 } 547 548 @Deprecated 549 @Override 550 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 551 SortInfo... sortInfos) { 552 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder) 553 .limit(limit) 554 .offset(offset) 555 .addSort(sortInfos); 556 return query(query); 557 } 558 559 // misc ==================================================================== 560 private boolean isReady() { 561 return (esa != null) && esa.isReady(); 562 } 563 564}