001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020package org.nuxeo.elasticsearch; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY; 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY; 025 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.concurrent.Callable; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.Executors; 035import java.util.concurrent.ThreadFactory; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.atomic.AtomicInteger; 039 040import javax.transaction.Transaction; 041 042import org.apache.commons.logging.Log; 043import org.apache.commons.logging.LogFactory; 044import org.elasticsearch.client.Client; 045import org.elasticsearch.index.query.QueryBuilder; 046import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 047import org.nuxeo.ecm.core.api.CoreSession; 048import org.nuxeo.ecm.core.api.DocumentModelList; 049import org.nuxeo.ecm.core.api.SortInfo; 050import org.nuxeo.ecm.core.repository.RepositoryService; 051import org.nuxeo.ecm.core.work.api.Work; 052import org.nuxeo.ecm.core.work.api.WorkManager; 053import org.nuxeo.elasticsearch.api.ESClientInitializationService; 054import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 055import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 056import org.nuxeo.elasticsearch.api.ElasticSearchService; 057import org.nuxeo.elasticsearch.api.EsResult; 058import org.nuxeo.elasticsearch.api.EsScrollResult; 059import org.nuxeo.elasticsearch.commands.IndexingCommand; 060import org.nuxeo.elasticsearch.config.ESClientInitializationDescriptor; 061import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor; 062import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 063import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig; 064import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig; 065import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl; 066import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl; 067import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl; 068import org.nuxeo.elasticsearch.query.NxQueryBuilder; 069import org.nuxeo.elasticsearch.work.IndexingWorker; 070import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker; 071import org.nuxeo.runtime.api.Framework; 072import org.nuxeo.runtime.model.ComponentContext; 073import org.nuxeo.runtime.model.ComponentInstance; 074import org.nuxeo.runtime.model.DefaultComponent; 075import org.nuxeo.runtime.transaction.TransactionHelper; 076 077import com.google.common.util.concurrent.ListenableFuture; 078import com.google.common.util.concurrent.ListeningExecutorService; 079import com.google.common.util.concurrent.MoreExecutors; 080 081/** 082 * Component used to configure and manage ElasticSearch integration 083 */ 084public class ElasticSearchComponent extends DefaultComponent implements ElasticSearchAdmin, ElasticSearchIndexing, 085 ElasticSearchService { 086 087 private static final Log log = LogFactory.getLog(ElasticSearchComponent.class); 088 089 private static final String EP_REMOTE = "elasticSearchRemote"; 090 091 private static final String EP_LOCAL = "elasticSearchLocal"; 092 093 private static final String EP_INDEX = "elasticSearchIndex"; 094 095 private static final String EP_DOC_WRITER = "elasticSearchDocWriter"; 096 097 private static final String EP_CLIENT_INIT = "elasticSearchClientInitialization"; 098 099 private static final long REINDEX_TIMEOUT = 20; 100 101 // Indexing commands that where received before the index initialization 102 private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>()); 103 104 private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>(); 105 106 private ElasticSearchLocalConfig localConfig; 107 108 private ElasticSearchRemoteConfig remoteConfig; 109 110 private ElasticSearchAdminImpl esa; 111 112 private ElasticSearchIndexingImpl esi; 113 114 private ElasticSearchServiceImpl ess; 115 116 protected JsonESDocumentWriter jsonESDocumentWriter; 117 118 protected ESClientInitializationService clientInitService; 119 120 private ListeningExecutorService waiterExecutorService; 121 122 private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0); 123 124 // Nuxeo Component impl ======================================é============= 125 @Override 126 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 127 switch (extensionPoint) { 128 case EP_LOCAL: 129 ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution; 130 if (localContrib.isEnabled()) { 131 localConfig = localContrib; 132 remoteConfig = null; 133 log.info("Registering local embedded configuration: " + localConfig + ", loaded from " 134 + contributor.getName()); 135 } else if (localConfig != null) { 136 log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName()); 137 localConfig = null; 138 } 139 break; 140 case EP_REMOTE: 141 ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution; 142 if (remoteContribution.isEnabled()) { 143 remoteConfig = remoteContribution; 144 localConfig = null; 145 log.info( 146 "Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName()); 147 } else if (remoteConfig != null) { 148 log.info("Disabling previous remote configuration, deactivated by " + contributor.getName()); 149 remoteConfig = null; 150 } 151 break; 152 case EP_INDEX: 153 ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution; 154 ElasticSearchIndexConfig previous = indexConfig.get(idx.getName()); 155 if (idx.isEnabled()) { 156 idx.merge(previous); 157 indexConfig.put(idx.getName(), idx); 158 log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName()); 159 } else if (previous != null) { 160 log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName()); 161 indexConfig.remove(idx.getName()); 162 } 163 break; 164 case EP_DOC_WRITER: 165 ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution; 166 try { 167 jsonESDocumentWriter = writerDescriptor.getKlass().newInstance(); 168 } catch (IllegalAccessException | InstantiationException e) { 169 log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass()); 170 throw new RuntimeException(e); 171 } 172 break; 173 case EP_CLIENT_INIT: 174 ESClientInitializationDescriptor clientInitDescriptor = (ESClientInitializationDescriptor) contribution; 175 try { 176 clientInitService = clientInitDescriptor.getKlass().newInstance(); 177 clientInitService.setUsername(clientInitDescriptor.getUsername()); 178 clientInitService.setPassword(clientInitDescriptor.getPassword()); 179 } catch (IllegalAccessException | InstantiationException e) { 180 log.error( 181 "Can not instantiate ES Client initialization service from " + clientInitDescriptor.getKlass()); 182 throw new RuntimeException(e); 183 } 184 break; 185 default: 186 throw new IllegalStateException("Invalid EP: " + extensionPoint); 187 } 188 } 189 190 @Override 191 public void applicationStarted(ComponentContext context) { 192 if (!isElasticsearchEnabled()) { 193 log.info("Elasticsearch service is disabled"); 194 return; 195 } 196 esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig, clientInitService); 197 esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter); 198 ess = new ElasticSearchServiceImpl(esa); 199 initListenerThreadPool(); 200 processStackedCommands(); 201 reindexOnStartup(); 202 } 203 204 private void reindexOnStartup() { 205 boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false")); 206 if (!reindexOnStartup) { 207 return; 208 } 209 for (String repositoryName : esa.getInitializedRepositories()) { 210 log.warn(String.format("Indexing repository: %s on startup", repositoryName)); 211 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 212 try { 213 prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS); 214 } catch (InterruptedException e) { 215 Thread.currentThread().interrupt(); 216 } catch (ExecutionException e) { 217 log.error(e.getMessage(), e); 218 } catch (TimeoutException e) { 219 log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background", 220 repositoryName, REINDEX_TIMEOUT)); 221 } 222 } 223 } 224 225 protected boolean isElasticsearchEnabled() { 226 return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true")); 227 } 228 229 @Override 230 public void deactivate(ComponentContext context) { 231 if (esa != null) { 232 esa.disconnect(); 233 } 234 } 235 236 @Override 237 public int getApplicationStartedOrder() { 238 RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent( 239 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 240 return component.getApplicationStartedOrder() / 2; 241 } 242 243 void processStackedCommands() { 244 if (!stackedCommands.isEmpty()) { 245 log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size())); 246 runIndexingWorker(stackedCommands); 247 stackedCommands.clear(); 248 log.debug("Done"); 249 } 250 } 251 252 // Es Admin ================================================================ 253 254 @Override 255 public Client getClient() { 256 return esa.getClient(); 257 } 258 259 @Override 260 public void initIndexes(boolean dropIfExists) { 261 esa.initIndexes(dropIfExists); 262 } 263 264 @Override 265 public void dropAndInitIndex(String indexName) { 266 esa.dropAndInitIndex(indexName); 267 } 268 269 @Override 270 public void dropAndInitRepositoryIndex(String repositoryName) { 271 esa.dropAndInitRepositoryIndex(repositoryName); 272 } 273 274 @Override 275 public List<String> getRepositoryNames() { 276 return esa.getRepositoryNames(); 277 } 278 279 @Override 280 public String getIndexNameForRepository(String repositoryName) { 281 return esa.getIndexNameForRepository(repositoryName); 282 } 283 284 @Override 285 public List<String> getIndexNamesForType(String type) { 286 return esa.getIndexNamesForType(type); 287 } 288 289 @Override 290 public String getIndexNameForType(String type) { 291 return esa.getIndexNameForType(type); 292 } 293 294 @SuppressWarnings("deprecation") 295 @Override 296 public long getPendingWorkerCount() { 297 WorkManager wm = Framework.getLocalService(WorkManager.class); 298 // api is deprecated for completed work 299 return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED); 300 } 301 302 @SuppressWarnings("deprecation") 303 @Override 304 public long getRunningWorkerCount() { 305 WorkManager wm = Framework.getLocalService(WorkManager.class); 306 // api is deprecated for completed work 307 return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING); 308 } 309 310 @Override 311 public int getTotalCommandProcessed() { 312 return esa.getTotalCommandProcessed(); 313 } 314 315 @Override 316 public boolean isEmbedded() { 317 return esa.isEmbedded(); 318 } 319 320 @Override 321 public boolean useExternalVersion() { 322 return esa.useExternalVersion(); 323 } 324 325 @Override 326 public boolean isIndexingInProgress() { 327 return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0); 328 } 329 330 @Override 331 public ListenableFuture<Boolean> prepareWaitForIndexing() { 332 return waiterExecutorService.submit(new Callable<Boolean>() { 333 @Override 334 public Boolean call() throws Exception { 335 WorkManager wm = Framework.getLocalService(WorkManager.class); 336 boolean completed = false; 337 do { 338 completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS); 339 } while (!completed); 340 return true; 341 } 342 }); 343 } 344 345 private static class NamedThreadFactory implements ThreadFactory { 346 @SuppressWarnings("NullableProblems") 347 @Override 348 public Thread newThread(Runnable r) { 349 return new Thread(r, "waitForEsIndexing"); 350 } 351 } 352 353 protected void initListenerThreadPool() { 354 waiterExecutorService = MoreExecutors.listeningDecorator( 355 Executors.newCachedThreadPool(new NamedThreadFactory())); 356 } 357 358 @Override 359 public void refresh() { 360 esa.refresh(); 361 } 362 363 @Override 364 public void refreshRepositoryIndex(String repositoryName) { 365 esa.refreshRepositoryIndex(repositoryName); 366 } 367 368 @Override 369 public void flush() { 370 esa.flush(); 371 } 372 373 @Override 374 public void flushRepositoryIndex(String repositoryName) { 375 esa.flushRepositoryIndex(repositoryName); 376 } 377 378 @Override 379 public void optimize() { 380 esa.optimize(); 381 } 382 383 @Override 384 public void optimizeRepositoryIndex(String repositoryName) { 385 esa.optimizeRepositoryIndex(repositoryName); 386 } 387 388 @Override 389 public void optimizeIndex(String indexName) { 390 esa.optimizeIndex(indexName); 391 } 392 393 // ES Indexing ============================================================= 394 395 @Override 396 public void indexNonRecursive(IndexingCommand cmd) { 397 indexNonRecursive(Collections.singletonList(cmd)); 398 } 399 400 @Override 401 public void indexNonRecursive(List<IndexingCommand> cmds) { 402 if (!isReady()) { 403 stackCommands(cmds); 404 return; 405 } 406 if (log.isDebugEnabled()) { 407 log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray())); 408 } 409 esi.indexNonRecursive(cmds); 410 } 411 412 protected void stackCommands(List<IndexingCommand> cmds) { 413 if (log.isDebugEnabled()) { 414 log.debug("Delaying indexing commands: Waiting for Index to be initialized." 415 + Arrays.toString(cmds.toArray())); 416 } 417 stackedCommands.addAll(cmds); 418 } 419 420 @Override 421 public void runIndexingWorker(List<IndexingCommand> cmds) { 422 if (!isReady()) { 423 stackCommands(cmds); 424 return; 425 } 426 runIndexingWorkerCount.incrementAndGet(); 427 try { 428 dispatchWork(cmds); 429 } finally { 430 runIndexingWorkerCount.decrementAndGet(); 431 } 432 } 433 434 /** 435 * Dispatch jobs between sync and async worker 436 */ 437 protected void dispatchWork(List<IndexingCommand> cmds) { 438 Map<String, List<IndexingCommand>> syncCommands = new HashMap<>(); 439 Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>(); 440 for (IndexingCommand cmd : cmds) { 441 if (cmd.isSync()) { 442 List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName()); 443 if (syncCmds == null) { 444 syncCmds = new ArrayList<>(); 445 } 446 syncCmds.add(cmd); 447 syncCommands.put(cmd.getRepositoryName(), syncCmds); 448 } else { 449 List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName()); 450 if (asyncCmds == null) { 451 asyncCmds = new ArrayList<>(); 452 } 453 asyncCmds.add(cmd); 454 asyncCommands.put(cmd.getRepositoryName(), asyncCmds); 455 } 456 } 457 runIndexingSyncWorker(syncCommands); 458 scheduleIndexingAsyncWorker(asyncCommands); 459 } 460 461 protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) { 462 if (asyncCommands.isEmpty()) { 463 return; 464 } 465 WorkManager wm = Framework.getLocalService(WorkManager.class); 466 for (String repositoryName : asyncCommands.keySet()) { 467 IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName)); 468 // we are in afterCompletion don't wait for a commit 469 wm.schedule(idxWork, false); 470 } 471 } 472 473 protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) { 474 if (syncCommands.isEmpty()) { 475 return; 476 } 477 Transaction transaction = TransactionHelper.suspendTransaction(); 478 try { 479 for (String repositoryName : syncCommands.keySet()) { 480 IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName)); 481 idxWork.run(); 482 } 483 } finally { 484 if (transaction != null) { 485 TransactionHelper.resumeTransaction(transaction); 486 } 487 488 } 489 } 490 491 @Override 492 public void runReindexingWorker(String repositoryName, String nxql) { 493 if (nxql == null || nxql.isEmpty()) { 494 throw new IllegalArgumentException("Expecting an NXQL query"); 495 } 496 ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql); 497 WorkManager wm = Framework.getLocalService(WorkManager.class); 498 wm.schedule(worker); 499 } 500 501 // ES Search =============================================================== 502 @Override 503 public DocumentModelList query(NxQueryBuilder queryBuilder) { 504 return ess.query(queryBuilder); 505 } 506 507 @Override 508 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 509 return ess.queryAndAggregate(queryBuilder); 510 } 511 512 @Override 513 public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) { 514 return ess.scroll(queryBuilder, keepAlive); 515 } 516 517 @Override 518 public EsScrollResult scroll(EsScrollResult scrollResult) { 519 return ess.scroll(scrollResult); 520 } 521 522 @Override 523 public void clearScroll(EsScrollResult scrollResult) { 524 ess.clearScroll(scrollResult); 525 } 526 527 @Deprecated 528 @Override 529 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) { 530 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 531 return query(query); 532 } 533 534 @Deprecated 535 @Override 536 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 537 SortInfo... sortInfos) { 538 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder) 539 .limit(limit) 540 .offset(offset) 541 .addSort(sortInfos); 542 return query(query); 543 } 544 545 // misc ==================================================================== 546 private boolean isReady() { 547 return (esa != null) && esa.isReady(); 548 } 549 550}