001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020package org.nuxeo.elasticsearch; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY; 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY; 025 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.concurrent.Callable; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.Executors; 035import java.util.concurrent.ThreadFactory; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.atomic.AtomicInteger; 039 040import javax.transaction.Transaction; 041 042import org.apache.commons.logging.Log; 043import org.apache.commons.logging.LogFactory; 044import org.elasticsearch.client.Client; 045import org.elasticsearch.index.query.QueryBuilder; 046import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 047import org.nuxeo.ecm.core.api.CoreSession; 048import org.nuxeo.ecm.core.api.DocumentModelList; 049import org.nuxeo.ecm.core.api.SortInfo; 050import org.nuxeo.ecm.core.repository.RepositoryService; 051import org.nuxeo.ecm.core.work.api.Work; 052import org.nuxeo.ecm.core.work.api.WorkManager; 053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 054import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 055import org.nuxeo.elasticsearch.api.ElasticSearchService; 056import org.nuxeo.elasticsearch.api.EsResult; 057import org.nuxeo.elasticsearch.api.EsScrollResult; 058import org.nuxeo.elasticsearch.commands.IndexingCommand; 059import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor; 060import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 061import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig; 062import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig; 063import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl; 064import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl; 065import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl; 066import org.nuxeo.elasticsearch.query.NxQueryBuilder; 067import org.nuxeo.elasticsearch.work.IndexingWorker; 068import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker; 069import org.nuxeo.runtime.api.Framework; 070import org.nuxeo.runtime.model.ComponentContext; 071import org.nuxeo.runtime.model.ComponentInstance; 072import org.nuxeo.runtime.model.DefaultComponent; 073import org.nuxeo.runtime.transaction.TransactionHelper; 074 075import com.google.common.util.concurrent.ListenableFuture; 076import com.google.common.util.concurrent.ListeningExecutorService; 077import com.google.common.util.concurrent.MoreExecutors; 078 079/** 080 * Component used to configure and manage ElasticSearch integration 081 */ 082public class ElasticSearchComponent extends DefaultComponent implements ElasticSearchAdmin, ElasticSearchIndexing, 083 ElasticSearchService { 084 085 private static final Log log = LogFactory.getLog(ElasticSearchComponent.class); 086 087 private static final String EP_REMOTE = "elasticSearchRemote"; 088 089 private static final String EP_LOCAL = "elasticSearchLocal"; 090 091 private static final String EP_INDEX = "elasticSearchIndex"; 092 093 private static final String EP_DOC_WRITER = "elasticSearchDocWriter"; 094 095 private static final long REINDEX_TIMEOUT = 20; 096 097 // Indexing commands that where received before the index initialization 098 private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>()); 099 100 private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>(); 101 102 private ElasticSearchLocalConfig localConfig; 103 104 private ElasticSearchRemoteConfig remoteConfig; 105 106 private ElasticSearchAdminImpl esa; 107 108 private ElasticSearchIndexingImpl esi; 109 110 private ElasticSearchServiceImpl ess; 111 112 protected JsonESDocumentWriter jsonESDocumentWriter; 113 114 private ListeningExecutorService waiterExecutorService; 115 116 private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0); 117 118 // Nuxeo Component impl ======================================é============= 119 @Override 120 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 121 switch (extensionPoint) { 122 case EP_LOCAL: 123 ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution; 124 if (localContrib.isEnabled()) { 125 localConfig = localContrib; 126 remoteConfig = null; 127 log.info("Registering local embedded configuration: " + localConfig + ", loaded from " 128 + contributor.getName()); 129 } else if (localConfig != null) { 130 log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName()); 131 localConfig = null; 132 } 133 break; 134 case EP_REMOTE: 135 ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution; 136 if (remoteContribution.isEnabled()) { 137 remoteConfig = remoteContribution; 138 localConfig = null; 139 log.info("Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName()); 140 } else if (remoteConfig != null) { 141 log.info("Disabling previous remote configuration, deactivated by " + contributor.getName()); 142 remoteConfig = null; 143 } 144 break; 145 case EP_INDEX: 146 ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution; 147 ElasticSearchIndexConfig previous = indexConfig.get(idx.getName()); 148 if (idx.isEnabled()) { 149 idx.merge(previous); 150 indexConfig.put(idx.getName(), idx); 151 log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName()); 152 } else if (previous != null) { 153 log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName()); 154 indexConfig.remove(idx.getName()); 155 } 156 break; 157 case EP_DOC_WRITER: 158 ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution; 159 try { 160 jsonESDocumentWriter = writerDescriptor.getKlass().newInstance(); 161 } catch (IllegalAccessException | InstantiationException e) { 162 log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass()); 163 throw new RuntimeException(e); 164 } 165 break; 166 default: 167 throw new IllegalStateException("Invalid EP: " + extensionPoint); 168 } 169 } 170 171 @Override 172 public void applicationStarted(ComponentContext context) { 173 if (!isElasticsearchEnabled()) { 174 log.info("Elasticsearch service is disabled"); 175 return; 176 } 177 esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig); 178 esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter); 179 ess = new ElasticSearchServiceImpl(esa); 180 initListenerThreadPool(); 181 processStackedCommands(); 182 reindexOnStartup(); 183 } 184 185 private void reindexOnStartup() { 186 boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false")); 187 if (!reindexOnStartup) { 188 return; 189 } 190 for (String repositoryName : esa.getInitializedRepositories()) { 191 log.warn(String.format("Indexing repository: %s on startup", repositoryName)); 192 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 193 try { 194 prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS); 195 } catch (InterruptedException e) { 196 Thread.currentThread().interrupt(); 197 } catch (ExecutionException e) { 198 log.error(e.getMessage(), e); 199 } catch (TimeoutException e) { 200 log.warn(String.format("Indexation of repository %s not finised after %d s, continuing in background", 201 repositoryName, REINDEX_TIMEOUT)); 202 } 203 } 204 } 205 206 protected boolean isElasticsearchEnabled() { 207 return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true")); 208 } 209 210 @Override 211 public void deactivate(ComponentContext context) { 212 if (esa != null) { 213 esa.disconnect(); 214 } 215 } 216 217 @Override 218 public int getApplicationStartedOrder() { 219 RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent( 220 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 221 return component.getApplicationStartedOrder() / 2; 222 } 223 224 void processStackedCommands() { 225 if (!stackedCommands.isEmpty()) { 226 log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size())); 227 runIndexingWorker(stackedCommands); 228 stackedCommands.clear(); 229 log.debug("Done"); 230 } 231 } 232 233 // Es Admin ================================================================ 234 235 @Override 236 public Client getClient() { 237 return esa.getClient(); 238 } 239 240 @Override 241 public void initIndexes(boolean dropIfExists) { 242 esa.initIndexes(dropIfExists); 243 } 244 245 @Override 246 public void dropAndInitIndex(String indexName) { 247 esa.dropAndInitIndex(indexName); 248 } 249 250 @Override 251 public void dropAndInitRepositoryIndex(String repositoryName) { 252 esa.dropAndInitRepositoryIndex(repositoryName); 253 } 254 255 @Override 256 public List<String> getRepositoryNames() { 257 return esa.getRepositoryNames(); 258 } 259 260 @Override 261 public String getIndexNameForRepository(String repositoryName) { 262 return esa.getIndexNameForRepository(repositoryName); 263 } 264 265 @Override 266 public List<String> getIndexNamesForType(String type) { 267 return esa.getIndexNamesForType(type); 268 } 269 270 @Override 271 public String getIndexNameForType(String type) { 272 return esa.getIndexNameForType(type); 273 } 274 275 @Override 276 public long getPendingWorkerCount() { 277 WorkManager wm = Framework.getLocalService(WorkManager.class); 278 return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED); 279 } 280 281 @Override 282 public long getRunningWorkerCount() { 283 WorkManager wm = Framework.getLocalService(WorkManager.class); 284 return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING); 285 } 286 287 @Override 288 public int getTotalCommandProcessed() { 289 return esa.getTotalCommandProcessed(); 290 } 291 292 @Override 293 public boolean isEmbedded() { 294 return esa.isEmbedded(); 295 } 296 297 @Override 298 public boolean useExternalVersion() { 299 return esa.useExternalVersion(); 300 } 301 302 @Override 303 public boolean isIndexingInProgress() { 304 return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0); 305 } 306 307 @Override 308 public ListenableFuture<Boolean> prepareWaitForIndexing() { 309 return waiterExecutorService.submit(new Callable<Boolean>() { 310 @Override 311 public Boolean call() throws Exception { 312 WorkManager wm = Framework.getLocalService(WorkManager.class); 313 boolean completed = false; 314 do { 315 completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS); 316 } while (!completed); 317 return true; 318 } 319 }); 320 } 321 322 private static class NamedThreadFactory implements ThreadFactory { 323 @SuppressWarnings("NullableProblems") 324 @Override 325 public Thread newThread(Runnable r) { 326 return new Thread(r, "waitForEsIndexing"); 327 } 328 } 329 330 protected void initListenerThreadPool() { 331 waiterExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory())); 332 } 333 334 @Override 335 public void refresh() { 336 esa.refresh(); 337 } 338 339 @Override 340 public void refreshRepositoryIndex(String repositoryName) { 341 esa.refreshRepositoryIndex(repositoryName); 342 } 343 344 @Override 345 public void flush() { 346 esa.flush(); 347 } 348 349 @Override 350 public void flushRepositoryIndex(String repositoryName) { 351 esa.flushRepositoryIndex(repositoryName); 352 } 353 354 @Override 355 public void optimize() { 356 esa.optimize(); 357 } 358 359 @Override 360 public void optimizeRepositoryIndex(String repositoryName) { 361 esa.optimizeRepositoryIndex(repositoryName); 362 } 363 364 @Override 365 public void optimizeIndex(String indexName) { 366 esa.optimizeIndex(indexName); 367 } 368 369 // ES Indexing ============================================================= 370 371 @Override 372 public void indexNonRecursive(IndexingCommand cmd) { 373 List<IndexingCommand> cmds = new ArrayList<>(1); 374 cmds.add(cmd); 375 indexNonRecursive(cmds); 376 } 377 378 @Override 379 public void indexNonRecursive(List<IndexingCommand> cmds) { 380 if (!isReady()) { 381 stackCommands(cmds); 382 return; 383 } 384 if (log.isDebugEnabled()) { 385 log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray())); 386 } 387 esi.indexNonRecursive(cmds); 388 } 389 390 protected void stackCommands(List<IndexingCommand> cmds) { 391 if (log.isDebugEnabled()) { 392 log.debug("Delaying indexing commands: Waiting for Index to be initialized." 393 + Arrays.toString(cmds.toArray())); 394 } 395 stackedCommands.addAll(cmds); 396 } 397 398 @Override 399 public void runIndexingWorker(List<IndexingCommand> cmds) { 400 if (!isReady()) { 401 stackCommands(cmds); 402 return; 403 } 404 runIndexingWorkerCount.incrementAndGet(); 405 try { 406 dispatchWork(cmds); 407 } finally { 408 runIndexingWorkerCount.decrementAndGet(); 409 } 410 } 411 412 /** 413 * Dispatch jobs between sync and async worker 414 */ 415 protected void dispatchWork(List<IndexingCommand> cmds) { 416 Map<String, List<IndexingCommand>> syncCommands = new HashMap<>(); 417 Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>(); 418 for (IndexingCommand cmd : cmds) { 419 if (cmd.isSync()) { 420 List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName()); 421 if (syncCmds == null) { 422 syncCmds = new ArrayList<>(); 423 } 424 syncCmds.add(cmd); 425 syncCommands.put(cmd.getRepositoryName(), syncCmds); 426 } else { 427 List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName()); 428 if (asyncCmds == null) { 429 asyncCmds = new ArrayList<>(); 430 } 431 asyncCmds.add(cmd); 432 asyncCommands.put(cmd.getRepositoryName(), asyncCmds); 433 } 434 } 435 runIndexingSyncWorker(syncCommands); 436 scheduleIndexingAsyncWorker(asyncCommands); 437 } 438 439 protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) { 440 if (asyncCommands.isEmpty()) { 441 return; 442 } 443 WorkManager wm = Framework.getLocalService(WorkManager.class); 444 for (String repositoryName : asyncCommands.keySet()) { 445 IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName)); 446 // we are in afterCompletion don't wait for a commit 447 wm.schedule(idxWork, false); 448 } 449 } 450 451 protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) { 452 if (syncCommands.isEmpty()) { 453 return; 454 } 455 Transaction transaction = TransactionHelper.suspendTransaction(); 456 try { 457 for (String repositoryName : syncCommands.keySet()) { 458 IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName)); 459 idxWork.run(); 460 } 461 } finally { 462 if (transaction != null) { 463 TransactionHelper.resumeTransaction(transaction); 464 } 465 466 } 467 } 468 469 @Override 470 public void runReindexingWorker(String repositoryName, String nxql) { 471 if (nxql == null || nxql.isEmpty()) { 472 throw new IllegalArgumentException("Expecting an NXQL query"); 473 } 474 ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql); 475 WorkManager wm = Framework.getLocalService(WorkManager.class); 476 wm.schedule(worker); 477 } 478 479 // ES Search =============================================================== 480 @Override 481 public DocumentModelList query(NxQueryBuilder queryBuilder) { 482 return ess.query(queryBuilder); 483 } 484 485 @Override 486 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 487 return ess.queryAndAggregate(queryBuilder); 488 } 489 490 @Override 491 public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) { 492 return ess.scroll(queryBuilder, keepAlive); 493 } 494 495 @Override 496 public EsScrollResult scroll(EsScrollResult scrollResult) { 497 return ess.scroll(scrollResult); 498 } 499 500 @Override 501 public void clearScroll(EsScrollResult scrollResult) { 502 ess.clearScroll(scrollResult); 503 } 504 505 @Deprecated 506 @Override 507 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) { 508 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 509 return query(query); 510 } 511 512 @Deprecated 513 @Override 514 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 515 SortInfo... sortInfos) { 516 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder) 517 .limit(limit) 518 .offset(offset) 519 .addSort(sortInfos); 520 return query(query); 521 } 522 523 // misc ==================================================================== 524 private boolean isReady() { 525 return (esa != null) && esa.isReady(); 526 } 527 528}