001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Tiry 016 * bdelbosc 017 */ 018package org.nuxeo.elasticsearch; 019 020import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY; 021import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID; 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY; 023 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.concurrent.Callable; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.Executors; 035import java.util.concurrent.ThreadFactory; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.atomic.AtomicInteger; 039 040import javax.transaction.Transaction; 041 042import org.apache.commons.logging.Log; 043import org.apache.commons.logging.LogFactory; 044import org.elasticsearch.client.Client; 045import org.elasticsearch.index.query.QueryBuilder; 046import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 047import org.nuxeo.ecm.core.api.CoreSession; 048import org.nuxeo.ecm.core.api.DocumentModelList; 049import org.nuxeo.ecm.core.api.SortInfo; 050import org.nuxeo.ecm.core.repository.RepositoryService; 051import org.nuxeo.ecm.core.work.api.Work; 052import org.nuxeo.ecm.core.work.api.WorkManager; 053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 054import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 055import org.nuxeo.elasticsearch.api.ElasticSearchService; 056import org.nuxeo.elasticsearch.api.EsResult; 057import org.nuxeo.elasticsearch.commands.IndexingCommand; 058import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor; 059import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 060import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig; 061import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig; 062import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl; 063import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl; 064import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl; 065import org.nuxeo.elasticsearch.query.NxQueryBuilder; 066import org.nuxeo.elasticsearch.work.IndexingWorker; 067import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker; 068import org.nuxeo.runtime.api.Framework; 069import org.nuxeo.runtime.model.ComponentContext; 070import org.nuxeo.runtime.model.ComponentInstance; 071import org.nuxeo.runtime.model.DefaultComponent; 072import org.nuxeo.runtime.transaction.TransactionHelper; 073 074import com.google.common.util.concurrent.ListenableFuture; 075import com.google.common.util.concurrent.ListeningExecutorService; 076import com.google.common.util.concurrent.MoreExecutors; 077 078/** 079 * Component used to configure and manage ElasticSearch integration 080 */ 081public class ElasticSearchComponent extends DefaultComponent implements ElasticSearchAdmin, ElasticSearchIndexing, 082 ElasticSearchService { 083 084 private static final Log log = LogFactory.getLog(ElasticSearchComponent.class); 085 086 private static final String EP_REMOTE = "elasticSearchRemote"; 087 088 private static final String EP_LOCAL = "elasticSearchLocal"; 089 090 private static final String EP_INDEX = "elasticSearchIndex"; 091 092 private static final String EP_DOC_WRITER = "elasticSearchDocWriter"; 093 094 private static final long REINDEX_TIMEOUT = 20; 095 096 // Indexing commands that where received before the index initialization 097 private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>()); 098 099 private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>(); 100 101 private ElasticSearchLocalConfig localConfig; 102 103 private ElasticSearchRemoteConfig remoteConfig; 104 105 private ElasticSearchAdminImpl esa; 106 107 private ElasticSearchIndexingImpl esi; 108 109 private ElasticSearchServiceImpl ess; 110 111 protected JsonESDocumentWriter jsonESDocumentWriter; 112 113 private ListeningExecutorService waiterExecutorService; 114 115 private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0); 116 117 // Nuxeo Component impl ======================================é============= 118 @Override 119 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 120 switch (extensionPoint) { 121 case EP_LOCAL: 122 ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution; 123 if (localContrib.isEnabled()) { 124 localConfig = localContrib; 125 remoteConfig = null; 126 log.info("Registering local embedded configuration: " + localConfig + ", loaded from " 127 + contributor.getName()); 128 } else if (localConfig != null) { 129 log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName()); 130 localConfig = null; 131 } 132 break; 133 case EP_REMOTE: 134 ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution; 135 if (remoteContribution.isEnabled()) { 136 remoteConfig = remoteContribution; 137 localConfig = null; 138 log.info("Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName()); 139 } else if (remoteConfig != null) { 140 log.info("Disabling previous remote configuration, deactivated by " + contributor.getName()); 141 remoteConfig = null; 142 } 143 break; 144 case EP_INDEX: 145 ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution; 146 ElasticSearchIndexConfig previous = indexConfig.get(idx.getName()); 147 if (idx.isEnabled()) { 148 idx.merge(previous); 149 indexConfig.put(idx.getName(), idx); 150 log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName()); 151 } else if (previous != null) { 152 log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName()); 153 indexConfig.remove(idx.getName()); 154 } 155 break; 156 case EP_DOC_WRITER: 157 ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution; 158 try { 159 jsonESDocumentWriter = writerDescriptor.getKlass().newInstance(); 160 } catch (IllegalAccessException | InstantiationException e) { 161 log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass()); 162 throw new RuntimeException(e); 163 } 164 break; 165 default: 166 throw new IllegalStateException("Invalid EP: " + extensionPoint); 167 } 168 } 169 170 @Override 171 public void applicationStarted(ComponentContext context) { 172 if (!isElasticsearchEnabled()) { 173 log.info("Elasticsearch service is disabled"); 174 return; 175 } 176 esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig); 177 esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter); 178 ess = new ElasticSearchServiceImpl(esa); 179 initListenerThreadPool(); 180 processStackedCommands(); 181 reindexOnStartup(); 182 } 183 184 private void reindexOnStartup() { 185 boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false")); 186 if (!reindexOnStartup) { 187 return; 188 } 189 for (String repositoryName : esa.getInitializedRepositories()) { 190 log.warn(String.format("Indexing repository: %s on startup", repositoryName)); 191 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 192 try { 193 prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS); 194 } catch (InterruptedException e) { 195 Thread.currentThread().interrupt(); 196 } catch (ExecutionException e) { 197 log.error(e.getMessage(), e); 198 } catch (TimeoutException e) { 199 log.warn(String.format("Indexation of repository %s not finised after %d s, continuing in background", 200 repositoryName, REINDEX_TIMEOUT)); 201 } 202 } 203 } 204 205 protected boolean isElasticsearchEnabled() { 206 return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true")); 207 } 208 209 @Override 210 public void deactivate(ComponentContext context) { 211 if (esa != null) { 212 esa.disconnect(); 213 } 214 } 215 216 @Override 217 public int getApplicationStartedOrder() { 218 RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent( 219 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 220 return component.getApplicationStartedOrder() / 2; 221 } 222 223 void processStackedCommands() { 224 if (!stackedCommands.isEmpty()) { 225 log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size())); 226 runIndexingWorker(stackedCommands); 227 stackedCommands.clear(); 228 log.debug("Done"); 229 } 230 } 231 232 // Es Admin ================================================================ 233 234 @Override 235 public Client getClient() { 236 return esa.getClient(); 237 } 238 239 @Override 240 public void initIndexes(boolean dropIfExists) { 241 esa.initIndexes(dropIfExists); 242 } 243 244 @Override 245 public void dropAndInitIndex(String indexName) { 246 esa.dropAndInitIndex(indexName); 247 } 248 249 @Override 250 public void dropAndInitRepositoryIndex(String repositoryName) { 251 esa.dropAndInitRepositoryIndex(repositoryName); 252 } 253 254 @Override 255 public List<String> getRepositoryNames() { 256 return esa.getRepositoryNames(); 257 } 258 259 @Override 260 public String getIndexNameForRepository(String repositoryName) { 261 return esa.getIndexNameForRepository(repositoryName); 262 } 263 264 @Override 265 public List<String> getIndexNamesForType(String type) { 266 return esa.getIndexNamesForType(type); 267 } 268 269 @Override 270 public String getIndexNameForType(String type) { 271 return esa.getIndexNameForType(type); 272 } 273 274 @Override 275 public int getPendingWorkerCount() { 276 WorkManager wm = Framework.getLocalService(WorkManager.class); 277 return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED); 278 } 279 280 @Override 281 public int getRunningWorkerCount() { 282 WorkManager wm = Framework.getLocalService(WorkManager.class); 283 return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING); 284 } 285 286 @Override 287 public int getTotalCommandProcessed() { 288 return esa.getTotalCommandProcessed(); 289 } 290 291 @Override 292 public boolean isEmbedded() { 293 return esa.isEmbedded(); 294 } 295 296 @Override 297 public boolean isIndexingInProgress() { 298 return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0); 299 } 300 301 @Override 302 public ListenableFuture<Boolean> prepareWaitForIndexing() { 303 return waiterExecutorService.submit(new Callable<Boolean>() { 304 @Override 305 public Boolean call() throws Exception { 306 WorkManager wm = Framework.getLocalService(WorkManager.class); 307 wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS); 308 return true; 309 } 310 }); 311 } 312 313 private static class NamedThreadFactory implements ThreadFactory { 314 @SuppressWarnings("NullableProblems") 315 @Override 316 public Thread newThread(Runnable r) { 317 return new Thread(r, "waitForEsIndexing"); 318 } 319 } 320 321 protected void initListenerThreadPool() { 322 waiterExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory())); 323 } 324 325 @Override 326 public void refresh() { 327 esa.refresh(); 328 } 329 330 @Override 331 public void refreshRepositoryIndex(String repositoryName) { 332 esa.refreshRepositoryIndex(repositoryName); 333 } 334 335 @Override 336 public void flush() { 337 esa.flush(); 338 } 339 340 @Override 341 public void flushRepositoryIndex(String repositoryName) { 342 esa.flushRepositoryIndex(repositoryName); 343 } 344 345 @Override 346 public void optimize() { 347 esa.optimize(); 348 } 349 350 @Override 351 public void optimizeRepositoryIndex(String repositoryName) { 352 esa.optimizeRepositoryIndex(repositoryName); 353 } 354 355 @Override 356 public void optimizeIndex(String indexName) { 357 esa.optimizeIndex(indexName); 358 } 359 360 // ES Indexing ============================================================= 361 362 @Override 363 public void indexNonRecursive(IndexingCommand cmd) { 364 List<IndexingCommand> cmds = new ArrayList<>(1); 365 cmds.add(cmd); 366 indexNonRecursive(cmds); 367 } 368 369 @Override 370 public void indexNonRecursive(List<IndexingCommand> cmds) { 371 if (!isReady()) { 372 stackCommands(cmds); 373 return; 374 } 375 if (log.isDebugEnabled()) { 376 log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray())); 377 } 378 esi.indexNonRecursive(cmds); 379 } 380 381 protected void stackCommands(List<IndexingCommand> cmds) { 382 if (log.isDebugEnabled()) { 383 log.debug("Delaying indexing commands: Waiting for Index to be initialized." 384 + Arrays.toString(cmds.toArray())); 385 } 386 stackedCommands.addAll(cmds); 387 } 388 389 @Override 390 public void runIndexingWorker(List<IndexingCommand> cmds) { 391 if (!isReady()) { 392 stackCommands(cmds); 393 return; 394 } 395 runIndexingWorkerCount.incrementAndGet(); 396 try { 397 dispatchWork(cmds); 398 } finally { 399 runIndexingWorkerCount.decrementAndGet(); 400 } 401 } 402 403 /** 404 * Dispatch jobs between sync and async worker 405 */ 406 protected void dispatchWork(List<IndexingCommand> cmds) { 407 Map<String, List<IndexingCommand>> syncCommands = new HashMap<>(); 408 Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>(); 409 for (IndexingCommand cmd : cmds) { 410 if (cmd.isSync()) { 411 List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName()); 412 if (syncCmds == null) { 413 syncCmds = new ArrayList<>(); 414 } 415 syncCmds.add(cmd); 416 syncCommands.put(cmd.getRepositoryName(), syncCmds); 417 } else { 418 List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName()); 419 if (asyncCmds == null) { 420 asyncCmds = new ArrayList<>(); 421 } 422 asyncCmds.add(cmd); 423 asyncCommands.put(cmd.getRepositoryName(), asyncCmds); 424 } 425 } 426 runIndexingSyncWorker(syncCommands); 427 scheduleIndexingAsyncWorker(asyncCommands); 428 } 429 430 protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) { 431 if (asyncCommands.isEmpty()) { 432 return; 433 } 434 WorkManager wm = Framework.getLocalService(WorkManager.class); 435 for (String repositoryName : asyncCommands.keySet()) { 436 IndexingWorker idxWork = new IndexingWorker(repositoryName, 437 asyncCommands.get(repositoryName)); 438 // we are in afterCompletion don't wait for a commit 439 wm.schedule(idxWork, false); 440 } 441 } 442 443 protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) { 444 if (syncCommands.isEmpty()) { 445 return; 446 } 447 Transaction transaction = TransactionHelper.suspendTransaction(); 448 try { 449 for (String repositoryName : syncCommands.keySet()) { 450 IndexingWorker idxWork = new IndexingWorker(repositoryName, 451 syncCommands.get(repositoryName)); 452 idxWork.run(); 453 } 454 } finally { 455 if (transaction != null) { 456 TransactionHelper.resumeTransaction(transaction); 457 } 458 459 } 460 } 461 462 @Override 463 public void runReindexingWorker(String repositoryName, String nxql) { 464 if (nxql == null || nxql.isEmpty()) { 465 throw new IllegalArgumentException("Expecting an NXQL query"); 466 } 467 ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql); 468 WorkManager wm = Framework.getLocalService(WorkManager.class); 469 wm.schedule(worker); 470 } 471 472 // ES Search =============================================================== 473 @Override 474 public DocumentModelList query(NxQueryBuilder queryBuilder) { 475 return ess.query(queryBuilder); 476 } 477 478 @Override 479 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 480 return ess.queryAndAggregate(queryBuilder); 481 } 482 483 @Deprecated 484 @Override 485 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) 486 { 487 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 488 return query(query); 489 } 490 491 @Deprecated 492 @Override 493 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 494 SortInfo... sortInfos) { 495 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort( 496 sortInfos); 497 return query(query); 498 } 499 500 // misc ==================================================================== 501 private boolean isReady() { 502 return (esa != null) && esa.isReady(); 503 } 504 505}