001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020package org.nuxeo.elasticsearch; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY; 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID; 024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY; 025 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.Callable; 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.Executors; 037import java.util.concurrent.ThreadFactory; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicInteger; 041 042import javax.transaction.Transaction; 043 044import org.apache.commons.logging.Log; 045import org.apache.commons.logging.LogFactory; 046import org.elasticsearch.client.Client; 047import org.elasticsearch.index.query.QueryBuilder; 048import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter; 049import org.nuxeo.ecm.core.api.CoreSession; 050import org.nuxeo.ecm.core.api.DocumentModelList; 051import org.nuxeo.ecm.core.api.SortInfo; 052import org.nuxeo.ecm.core.repository.RepositoryService; 053import org.nuxeo.ecm.core.work.api.Work; 054import org.nuxeo.ecm.core.work.api.WorkManager; 055import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 056import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 057import org.nuxeo.elasticsearch.api.ElasticSearchService; 058import org.nuxeo.elasticsearch.api.EsResult; 059import org.nuxeo.elasticsearch.commands.IndexingCommand; 060import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor; 061import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig; 062import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig; 063import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig; 064import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl; 065import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl; 066import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl; 067import org.nuxeo.elasticsearch.query.NxQueryBuilder; 068import org.nuxeo.elasticsearch.work.IndexingWorker; 069import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker; 070import org.nuxeo.runtime.api.Framework; 071import org.nuxeo.runtime.model.ComponentContext; 072import org.nuxeo.runtime.model.ComponentInstance; 073import org.nuxeo.runtime.model.DefaultComponent; 074import org.nuxeo.runtime.transaction.TransactionHelper; 075 076import com.google.common.util.concurrent.ListenableFuture; 077import com.google.common.util.concurrent.ListeningExecutorService; 078import com.google.common.util.concurrent.MoreExecutors; 079 080/** 081 * Component used to configure and manage ElasticSearch integration 082 */ 083public class ElasticSearchComponent extends DefaultComponent implements ElasticSearchAdmin, ElasticSearchIndexing, 084 ElasticSearchService { 085 086 private static final Log log = LogFactory.getLog(ElasticSearchComponent.class); 087 088 private static final String EP_REMOTE = "elasticSearchRemote"; 089 090 private static final String EP_LOCAL = "elasticSearchLocal"; 091 092 private static final String EP_INDEX = "elasticSearchIndex"; 093 094 private static final String EP_DOC_WRITER = "elasticSearchDocWriter"; 095 096 private static final long REINDEX_TIMEOUT = 20; 097 098 // Indexing commands that where received before the index initialization 099 private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>()); 100 101 private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>(); 102 103 private ElasticSearchLocalConfig localConfig; 104 105 private ElasticSearchRemoteConfig remoteConfig; 106 107 private ElasticSearchAdminImpl esa; 108 109 private ElasticSearchIndexingImpl esi; 110 111 private ElasticSearchServiceImpl ess; 112 113 protected JsonESDocumentWriter jsonESDocumentWriter; 114 115 private ListeningExecutorService waiterExecutorService; 116 117 private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0); 118 119 // Nuxeo Component impl ======================================é============= 120 @Override 121 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 122 switch (extensionPoint) { 123 case EP_LOCAL: 124 ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution; 125 if (localContrib.isEnabled()) { 126 localConfig = localContrib; 127 remoteConfig = null; 128 log.info("Registering local embedded configuration: " + localConfig + ", loaded from " 129 + contributor.getName()); 130 } else if (localConfig != null) { 131 log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName()); 132 localConfig = null; 133 } 134 break; 135 case EP_REMOTE: 136 ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution; 137 if (remoteContribution.isEnabled()) { 138 remoteConfig = remoteContribution; 139 localConfig = null; 140 log.info("Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName()); 141 } else if (remoteConfig != null) { 142 log.info("Disabling previous remote configuration, deactivated by " + contributor.getName()); 143 remoteConfig = null; 144 } 145 break; 146 case EP_INDEX: 147 ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution; 148 ElasticSearchIndexConfig previous = indexConfig.get(idx.getName()); 149 if (idx.isEnabled()) { 150 idx.merge(previous); 151 indexConfig.put(idx.getName(), idx); 152 log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName()); 153 } else if (previous != null) { 154 log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName()); 155 indexConfig.remove(idx.getName()); 156 } 157 break; 158 case EP_DOC_WRITER: 159 ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution; 160 try { 161 jsonESDocumentWriter = writerDescriptor.getKlass().newInstance(); 162 } catch (IllegalAccessException | InstantiationException e) { 163 log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass()); 164 throw new RuntimeException(e); 165 } 166 break; 167 default: 168 throw new IllegalStateException("Invalid EP: " + extensionPoint); 169 } 170 } 171 172 @Override 173 public void applicationStarted(ComponentContext context) { 174 if (!isElasticsearchEnabled()) { 175 log.info("Elasticsearch service is disabled"); 176 return; 177 } 178 esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig); 179 esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter); 180 ess = new ElasticSearchServiceImpl(esa); 181 initListenerThreadPool(); 182 processStackedCommands(); 183 reindexOnStartup(); 184 } 185 186 private void reindexOnStartup() { 187 boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false")); 188 if (!reindexOnStartup) { 189 return; 190 } 191 for (String repositoryName : esa.getInitializedRepositories()) { 192 log.warn(String.format("Indexing repository: %s on startup", repositoryName)); 193 runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 194 try { 195 prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS); 196 } catch (InterruptedException e) { 197 Thread.currentThread().interrupt(); 198 } catch (ExecutionException e) { 199 log.error(e.getMessage(), e); 200 } catch (TimeoutException e) { 201 log.warn(String.format("Indexation of repository %s not finised after %d s, continuing in background", 202 repositoryName, REINDEX_TIMEOUT)); 203 } 204 } 205 } 206 207 protected boolean isElasticsearchEnabled() { 208 return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true")); 209 } 210 211 @Override 212 public void deactivate(ComponentContext context) { 213 if (esa != null) { 214 esa.disconnect(); 215 } 216 } 217 218 @Override 219 public int getApplicationStartedOrder() { 220 RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent( 221 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 222 return component.getApplicationStartedOrder() / 2; 223 } 224 225 void processStackedCommands() { 226 if (!stackedCommands.isEmpty()) { 227 log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size())); 228 runIndexingWorker(stackedCommands); 229 stackedCommands.clear(); 230 log.debug("Done"); 231 } 232 } 233 234 // Es Admin ================================================================ 235 236 @Override 237 public Client getClient() { 238 return esa.getClient(); 239 } 240 241 @Override 242 public void initIndexes(boolean dropIfExists) { 243 esa.initIndexes(dropIfExists); 244 } 245 246 @Override 247 public void dropAndInitIndex(String indexName) { 248 esa.dropAndInitIndex(indexName); 249 } 250 251 @Override 252 public void dropAndInitRepositoryIndex(String repositoryName) { 253 esa.dropAndInitRepositoryIndex(repositoryName); 254 } 255 256 @Override 257 public List<String> getRepositoryNames() { 258 return esa.getRepositoryNames(); 259 } 260 261 @Override 262 public String getIndexNameForRepository(String repositoryName) { 263 return esa.getIndexNameForRepository(repositoryName); 264 } 265 266 @Override 267 public List<String> getIndexNamesForType(String type) { 268 return esa.getIndexNamesForType(type); 269 } 270 271 @Override 272 public String getIndexNameForType(String type) { 273 return esa.getIndexNameForType(type); 274 } 275 276 @Override 277 public int getPendingWorkerCount() { 278 WorkManager wm = Framework.getLocalService(WorkManager.class); 279 return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED); 280 } 281 282 @Override 283 public int getRunningWorkerCount() { 284 WorkManager wm = Framework.getLocalService(WorkManager.class); 285 return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING); 286 } 287 288 @Override 289 public int getTotalCommandProcessed() { 290 return esa.getTotalCommandProcessed(); 291 } 292 293 @Override 294 public boolean isEmbedded() { 295 return esa.isEmbedded(); 296 } 297 298 @Override 299 public boolean isIndexingInProgress() { 300 return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0); 301 } 302 303 @Override 304 public ListenableFuture<Boolean> prepareWaitForIndexing() { 305 return waiterExecutorService.submit(new Callable<Boolean>() { 306 @Override 307 public Boolean call() throws Exception { 308 WorkManager wm = Framework.getLocalService(WorkManager.class); 309 boolean completed = false; 310 do { 311 completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS); 312 } while (! completed); 313 return true; 314 } 315 }); 316 } 317 318 private static class NamedThreadFactory implements ThreadFactory { 319 @SuppressWarnings("NullableProblems") 320 @Override 321 public Thread newThread(Runnable r) { 322 return new Thread(r, "waitForEsIndexing"); 323 } 324 } 325 326 protected void initListenerThreadPool() { 327 waiterExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory())); 328 } 329 330 @Override 331 public void refresh() { 332 esa.refresh(); 333 } 334 335 @Override 336 public void refreshRepositoryIndex(String repositoryName) { 337 esa.refreshRepositoryIndex(repositoryName); 338 } 339 340 @Override 341 public void flush() { 342 esa.flush(); 343 } 344 345 @Override 346 public void flushRepositoryIndex(String repositoryName) { 347 esa.flushRepositoryIndex(repositoryName); 348 } 349 350 @Override 351 public void optimize() { 352 esa.optimize(); 353 } 354 355 @Override 356 public void optimizeRepositoryIndex(String repositoryName) { 357 esa.optimizeRepositoryIndex(repositoryName); 358 } 359 360 @Override 361 public void optimizeIndex(String indexName) { 362 esa.optimizeIndex(indexName); 363 } 364 365 // ES Indexing ============================================================= 366 367 @Override 368 public void indexNonRecursive(IndexingCommand cmd) { 369 List<IndexingCommand> cmds = new ArrayList<>(1); 370 cmds.add(cmd); 371 indexNonRecursive(cmds); 372 } 373 374 @Override 375 public void indexNonRecursive(List<IndexingCommand> cmds) { 376 if (!isReady()) { 377 stackCommands(cmds); 378 return; 379 } 380 if (log.isDebugEnabled()) { 381 log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray())); 382 } 383 esi.indexNonRecursive(cmds); 384 } 385 386 protected void stackCommands(List<IndexingCommand> cmds) { 387 if (log.isDebugEnabled()) { 388 log.debug("Delaying indexing commands: Waiting for Index to be initialized." 389 + Arrays.toString(cmds.toArray())); 390 } 391 stackedCommands.addAll(cmds); 392 } 393 394 @Override 395 public void runIndexingWorker(List<IndexingCommand> cmds) { 396 if (!isReady()) { 397 stackCommands(cmds); 398 return; 399 } 400 runIndexingWorkerCount.incrementAndGet(); 401 try { 402 dispatchWork(cmds); 403 } finally { 404 runIndexingWorkerCount.decrementAndGet(); 405 } 406 } 407 408 /** 409 * Dispatch jobs between sync and async worker 410 */ 411 protected void dispatchWork(List<IndexingCommand> cmds) { 412 Map<String, List<IndexingCommand>> syncCommands = new HashMap<>(); 413 Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>(); 414 for (IndexingCommand cmd : cmds) { 415 if (cmd.isSync()) { 416 List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName()); 417 if (syncCmds == null) { 418 syncCmds = new ArrayList<>(); 419 } 420 syncCmds.add(cmd); 421 syncCommands.put(cmd.getRepositoryName(), syncCmds); 422 } else { 423 List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName()); 424 if (asyncCmds == null) { 425 asyncCmds = new ArrayList<>(); 426 } 427 asyncCmds.add(cmd); 428 asyncCommands.put(cmd.getRepositoryName(), asyncCmds); 429 } 430 } 431 runIndexingSyncWorker(syncCommands); 432 scheduleIndexingAsyncWorker(asyncCommands); 433 } 434 435 protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) { 436 if (asyncCommands.isEmpty()) { 437 return; 438 } 439 WorkManager wm = Framework.getLocalService(WorkManager.class); 440 for (String repositoryName : asyncCommands.keySet()) { 441 IndexingWorker idxWork = new IndexingWorker(repositoryName, 442 asyncCommands.get(repositoryName)); 443 // we are in afterCompletion don't wait for a commit 444 wm.schedule(idxWork, false); 445 } 446 } 447 448 protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) { 449 if (syncCommands.isEmpty()) { 450 return; 451 } 452 Transaction transaction = TransactionHelper.suspendTransaction(); 453 try { 454 for (String repositoryName : syncCommands.keySet()) { 455 IndexingWorker idxWork = new IndexingWorker(repositoryName, 456 syncCommands.get(repositoryName)); 457 idxWork.run(); 458 } 459 } finally { 460 if (transaction != null) { 461 TransactionHelper.resumeTransaction(transaction); 462 } 463 464 } 465 } 466 467 @Override 468 public void runReindexingWorker(String repositoryName, String nxql) { 469 if (nxql == null || nxql.isEmpty()) { 470 throw new IllegalArgumentException("Expecting an NXQL query"); 471 } 472 ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql); 473 WorkManager wm = Framework.getLocalService(WorkManager.class); 474 wm.schedule(worker); 475 } 476 477 // ES Search =============================================================== 478 @Override 479 public DocumentModelList query(NxQueryBuilder queryBuilder) { 480 return ess.query(queryBuilder); 481 } 482 483 @Override 484 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 485 return ess.queryAndAggregate(queryBuilder); 486 } 487 488 @Deprecated 489 @Override 490 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) 491 { 492 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 493 return query(query); 494 } 495 496 @Deprecated 497 @Override 498 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 499 SortInfo... sortInfos) { 500 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort( 501 sortInfos); 502 return query(query); 503 } 504 505 // misc ==================================================================== 506 private boolean isReady() { 507 return (esa != null) && esa.isReady(); 508 } 509 510}