001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020package org.nuxeo.elasticsearch;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY;
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Optional;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.TimeoutException;
039import java.util.concurrent.atomic.AtomicInteger;
040
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.elasticsearch.common.bytes.BytesReference;
044import org.elasticsearch.index.query.QueryBuilder;
045import org.nuxeo.ecm.core.api.CoreSession;
046import org.nuxeo.ecm.core.api.DocumentModel;
047import org.nuxeo.ecm.core.api.DocumentModelList;
048import org.nuxeo.ecm.core.api.NuxeoException;
049import org.nuxeo.ecm.core.api.SortInfo;
050import org.nuxeo.ecm.core.repository.RepositoryService;
051import org.nuxeo.ecm.core.work.api.Work;
052import org.nuxeo.ecm.core.work.api.WorkManager;
053import org.nuxeo.elasticsearch.api.ESClient;
054import org.nuxeo.elasticsearch.api.ESHintQueryBuilder;
055import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
056import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
057import org.nuxeo.elasticsearch.api.ElasticSearchService;
058import org.nuxeo.elasticsearch.api.EsResult;
059import org.nuxeo.elasticsearch.api.EsScrollResult;
060import org.nuxeo.elasticsearch.commands.IndexingCommand;
061import org.nuxeo.elasticsearch.config.ESHintQueryBuilderDescriptor;
062import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
063import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor;
064import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig;
065import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
066import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl;
067import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl;
068import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl;
069import org.nuxeo.elasticsearch.io.JsonESDocumentWriter;
070import org.nuxeo.elasticsearch.query.NxQueryBuilder;
071import org.nuxeo.elasticsearch.work.IndexingWorker;
072import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker;
073import org.nuxeo.runtime.api.Framework;
074import org.nuxeo.runtime.model.ComponentContext;
075import org.nuxeo.runtime.model.ComponentInstance;
076import org.nuxeo.runtime.model.DefaultComponent;
077import org.nuxeo.runtime.transaction.TransactionHelper;
078
079import com.google.common.util.concurrent.ListenableFuture;
080import com.google.common.util.concurrent.ListeningExecutorService;
081import com.google.common.util.concurrent.MoreExecutors;
082
083/**
084 * Component used to configure and manage ElasticSearch integration
085 */
086public class ElasticSearchComponent extends DefaultComponent
087        implements ElasticSearchAdmin, ElasticSearchIndexing, ElasticSearchService {
088
089    protected static final Log log = LogFactory.getLog(ElasticSearchComponent.class);
090
091    protected static final String EP_EMBEDDED_SERVER = "elasticSearchEmbeddedServer";
092
093    protected static final String EP_CLIENT_INIT = "elasticSearchClient";
094
095    protected static final String EP_INDEX = "elasticSearchIndex";
096
097    protected static final String EP_DOC_WRITER = "elasticSearchDocWriter";
098
099    /**
100     * @since 11.1
101     */
102    protected static final String EP_HINTS = "elasticSearchHints";
103
104    protected static final long REINDEX_TIMEOUT = 20;
105
106    // Indexing commands that where received before the index initialization
107    protected final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>());
108
109    protected final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>();
110
111    protected final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0);
112
113    protected ElasticSearchEmbeddedServerConfig embeddedServerConfig;
114
115    protected ElasticSearchClientConfig clientConfig;
116
117    protected ElasticSearchAdminImpl esa;
118
119    protected ElasticSearchIndexingImpl esi;
120
121    protected ElasticSearchServiceImpl ess;
122
123    protected JsonESDocumentWriter jsonESDocumentWriter;
124
125    protected ListeningExecutorService waiterExecutorService;
126
127    // Nuxeo Component impl ======================================é=============
128    @Override
129    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
130        switch (extensionPoint) {
131        case EP_EMBEDDED_SERVER:
132            ElasticSearchEmbeddedServerConfig serverContrib = (ElasticSearchEmbeddedServerConfig) contribution;
133            if (serverContrib.isEnabled()) {
134                embeddedServerConfig = serverContrib;
135                log.info("Registering embedded server configuration: " + embeddedServerConfig + ", loaded from "
136                        + contributor.getName());
137            } else if (embeddedServerConfig != null) {
138                log.info("Disabling previous embedded server configuration, deactivated by " + contributor.getName());
139                embeddedServerConfig = null;
140            }
141            break;
142        case EP_CLIENT_INIT:
143            clientConfig = (ElasticSearchClientConfig) contribution;
144            break;
145        case EP_INDEX:
146            ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution;
147            ElasticSearchIndexConfig previous = indexConfig.get(idx.getName());
148            if (idx.isEnabled()) {
149                idx.merge(previous);
150                indexConfig.put(idx.getName(), idx);
151                log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName());
152            } else if (previous != null) {
153                log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName());
154                indexConfig.remove(idx.getName());
155            }
156            break;
157        case EP_DOC_WRITER:
158            ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution;
159            try {
160                jsonESDocumentWriter = writerDescriptor.getKlass().getDeclaredConstructor().newInstance();
161            } catch (ReflectiveOperationException e) {
162                log.error("Cannot instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass());
163                throw new NuxeoException(e);
164            }
165            break;
166        case EP_HINTS:
167            ESHintQueryBuilderDescriptor esHintDescriptor = (ESHintQueryBuilderDescriptor) contribution;
168            register(EP_HINTS, esHintDescriptor);
169            break;
170        default:
171            throw new IllegalStateException("Invalid EP: " + extensionPoint);
172        }
173    }
174
175    @Override
176    public void start(ComponentContext context) {
177        if (!isElasticsearchEnabled()) {
178            log.info("Elasticsearch service is disabled");
179            return;
180        }
181        esa = new ElasticSearchAdminImpl(embeddedServerConfig, clientConfig, indexConfig, getDescriptors(EP_HINTS));
182        esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter);
183        ess = new ElasticSearchServiceImpl(esa);
184        initListenerThreadPool();
185        processStackedCommands();
186        reindexOnStartup();
187    }
188
189    @Override
190    public void stop(ComponentContext context) {
191        if (esa == null) {
192            // Elasticsearch service was disabled
193            return;
194        }
195        try {
196            shutdownListenerThreadPool();
197        } finally {
198            try {
199                esa.disconnect();
200            } finally {
201                esa = null;
202                esi = null;
203                ess = null;
204            }
205        }
206    }
207
208    protected void reindexOnStartup() {
209        boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false"));
210        if (!reindexOnStartup) {
211            return;
212        }
213        for (String repositoryName : esa.getInitializedRepositories()) {
214            log.warn(String.format("Indexing repository: %s on startup", repositoryName));
215            runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
216            try {
217                prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS);
218            } catch (InterruptedException e) {
219                Thread.currentThread().interrupt();
220            } catch (ExecutionException e) {
221                log.error(e.getMessage(), e);
222            } catch (TimeoutException e) {
223                log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background",
224                        repositoryName, REINDEX_TIMEOUT));
225            }
226        }
227    }
228
229    protected boolean isElasticsearchEnabled() {
230        return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true"));
231    }
232
233    @Override
234    public int getApplicationStartedOrder() {
235        RepositoryService component = (RepositoryService) Framework.getRuntime()
236                                                                   .getComponent(
237                                                                           "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
238        return component.getApplicationStartedOrder() / 2;
239    }
240
241    void processStackedCommands() {
242        if (!stackedCommands.isEmpty()) {
243            log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size()));
244            runIndexingWorker(stackedCommands);
245            stackedCommands.clear();
246            log.debug("Done");
247        }
248    }
249
250    // Es Admin ================================================================
251
252    @Override
253    public ESClient getClient() {
254        return esa.getClient();
255    }
256
257    @Override
258    public void initIndexes(boolean dropIfExists) {
259        esa.initIndexes(dropIfExists);
260    }
261
262    @Override
263    public void dropAndInitIndex(String indexName) {
264        esa.dropAndInitIndex(indexName);
265    }
266
267    @Override
268    public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) {
269        esa.dropAndInitRepositoryIndex(repositoryName, syncAlias);
270    }
271
272    @Override
273    public List<String> getRepositoryNames() {
274        return esa.getRepositoryNames();
275    }
276
277    @Override
278    public String getIndexNameForRepository(String repositoryName) {
279        return esa.getIndexNameForRepository(repositoryName);
280    }
281
282    @Override
283    public String getRepositoryForIndex(String indexName) {
284        return esa.getRepositoryForIndex(indexName);
285    }
286
287    @Override
288    public List<String> getIndexNamesForType(String type) {
289        return esa.getIndexNamesForType(type);
290    }
291
292    @Override
293    public String getIndexNameForType(String type) {
294        return esa.getIndexNameForType(type);
295    }
296
297    @Override
298    public String getWriteIndexName(String searchIndexName) {
299        return esa.getWriteIndexName(searchIndexName);
300    }
301
302    @Override
303    public void syncSearchAndWriteAlias(String searchIndexName) {
304        esa.syncSearchAndWriteAlias(searchIndexName);
305    }
306
307    @SuppressWarnings("deprecation")
308    @Override
309    public long getPendingWorkerCount() {
310        WorkManager wm = Framework.getService(WorkManager.class);
311        // api is deprecated for completed work
312        return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED);
313    }
314
315    @SuppressWarnings("deprecation")
316    @Override
317    public long getRunningWorkerCount() {
318        WorkManager wm = Framework.getService(WorkManager.class);
319        // api is deprecated for completed work
320        return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING);
321    }
322
323    @Override
324    public int getTotalCommandProcessed() {
325        return esa.getTotalCommandProcessed();
326    }
327
328    @Override
329    public boolean isEmbedded() {
330        return esa.isEmbedded();
331    }
332
333    @Override
334    public boolean useExternalVersion() {
335        return esa.useExternalVersion();
336    }
337
338    @Override
339    public boolean isIndexingInProgress() {
340        return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0);
341    }
342
343    @Override
344    public ListenableFuture<Boolean> prepareWaitForIndexing() {
345        return waiterExecutorService.submit(() -> {
346            WorkManager wm = Framework.getService(WorkManager.class);
347            boolean completed;
348            do {
349                completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS);
350            } while (!completed);
351            return true;
352        });
353    }
354
355    protected void initListenerThreadPool() {
356        waiterExecutorService = MoreExecutors.listeningDecorator(
357                Executors.newCachedThreadPool(new NamedThreadFactory()));
358    }
359
360    protected void shutdownListenerThreadPool() {
361        try {
362            waiterExecutorService.shutdown();
363        } finally {
364            waiterExecutorService = null;
365        }
366    }
367
368    @Override
369    public void refresh() {
370        esa.refresh();
371    }
372
373    @Override
374    public void refreshRepositoryIndex(String repositoryName) {
375        esa.refreshRepositoryIndex(repositoryName);
376    }
377
378    @Override
379    public void flush() {
380        esa.flush();
381    }
382
383    @Override
384    public void flushRepositoryIndex(String repositoryName) {
385        esa.flushRepositoryIndex(repositoryName);
386    }
387
388    @Override
389    public void optimize() {
390        esa.optimize();
391    }
392
393    @Override
394    public void optimizeRepositoryIndex(String repositoryName) {
395        esa.optimizeRepositoryIndex(repositoryName);
396    }
397
398    @Override
399    public void optimizeIndex(String indexName) {
400        esa.optimizeIndex(indexName);
401    }
402
403    @Override
404    public void indexNonRecursive(IndexingCommand cmd) {
405        indexNonRecursive(Collections.singletonList(cmd));
406    }
407
408    // ES Indexing =============================================================
409
410    @Override
411    public void indexNonRecursive(List<IndexingCommand> cmds) {
412        if (!isReady()) {
413            stackCommands(cmds);
414            return;
415        }
416        if (log.isDebugEnabled()) {
417            log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray()));
418        }
419        esi.indexNonRecursive(cmds);
420    }
421
422    protected void stackCommands(List<IndexingCommand> cmds) {
423        if (log.isDebugEnabled()) {
424            log.debug("Delaying indexing commands: Waiting for Index to be initialized."
425                    + Arrays.toString(cmds.toArray()));
426        }
427        stackedCommands.addAll(cmds);
428    }
429
430    @Override
431    public void runIndexingWorker(List<IndexingCommand> cmds) {
432        if (!isReady()) {
433            stackCommands(cmds);
434            return;
435        }
436        runIndexingWorkerCount.incrementAndGet();
437        try {
438            dispatchWork(cmds);
439        } finally {
440            runIndexingWorkerCount.decrementAndGet();
441        }
442    }
443
444    /**
445     * Dispatch jobs between sync and async worker
446     */
447    protected void dispatchWork(List<IndexingCommand> cmds) {
448        Map<String, List<IndexingCommand>> syncCommands = new HashMap<>();
449        Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>();
450        for (IndexingCommand cmd : cmds) {
451            if (cmd.isSync()) {
452                List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName());
453                if (syncCmds == null) {
454                    syncCmds = new ArrayList<>();
455                }
456                syncCmds.add(cmd);
457                syncCommands.put(cmd.getRepositoryName(), syncCmds);
458            } else {
459                List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName());
460                if (asyncCmds == null) {
461                    asyncCmds = new ArrayList<>();
462                }
463                asyncCmds.add(cmd);
464                asyncCommands.put(cmd.getRepositoryName(), asyncCmds);
465            }
466        }
467        runIndexingSyncWorker(syncCommands);
468        scheduleIndexingAsyncWorker(asyncCommands);
469    }
470
471    protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) {
472        if (asyncCommands.isEmpty()) {
473            return;
474        }
475        WorkManager wm = Framework.getService(WorkManager.class);
476        for (String repositoryName : asyncCommands.keySet()) {
477            IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName));
478            // we are in afterCompletion don't wait for a commit
479            wm.schedule(idxWork, false);
480        }
481    }
482
483    protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) {
484        if (syncCommands.isEmpty()) {
485            return;
486        }
487        TransactionHelper.runWithoutTransaction(() -> {
488            for (String repositoryName : syncCommands.keySet()) {
489                IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName));
490                idxWork.run();
491            }
492        });
493    }
494
495    @Override
496    public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) {
497        if (nxql == null || nxql.isEmpty()) {
498            throw new IllegalArgumentException("Expecting an NXQL query");
499        }
500        ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql, syncAlias);
501        WorkManager wm = Framework.getService(WorkManager.class);
502        wm.schedule(worker);
503    }
504
505    @Override
506    public void reindexRepository(String repositoryName) {
507        esa.dropAndInitRepositoryIndex(repositoryName, false);
508        runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document", true);
509    }
510
511    @Override
512    public BytesReference source(DocumentModel doc) throws IOException {
513        return esi.source(doc);
514    }
515
516    // ES Search ===============================================================
517    @Override
518    public DocumentModelList query(NxQueryBuilder queryBuilder) {
519        return ess.query(queryBuilder);
520    }
521
522    @Override
523    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
524        return ess.queryAndAggregate(queryBuilder);
525    }
526
527    @Override
528    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
529        return ess.scroll(queryBuilder, keepAlive);
530    }
531
532    @Override
533    public EsScrollResult scroll(EsScrollResult scrollResult) {
534        return ess.scroll(scrollResult);
535    }
536
537    @Override
538    public void clearScroll(EsScrollResult scrollResult) {
539        ess.clearScroll(scrollResult);
540    }
541
542    @Deprecated
543    @Override
544    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
545        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
546        return query(query);
547    }
548
549    @Deprecated
550    @Override
551    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
552            SortInfo... sortInfos) {
553        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder)
554                                                          .limit(limit)
555                                                          .offset(offset)
556                                                          .addSort(sortInfos);
557        return query(query);
558    }
559
560    // misc ====================================================================
561    protected boolean isReady() {
562        return (esa != null) && esa.isReady();
563    }
564
565    protected static class NamedThreadFactory implements ThreadFactory {
566        @SuppressWarnings("NullableProblems")
567        @Override
568        public Thread newThread(Runnable r) {
569            return new Thread(r, "waitForEsIndexing");
570        }
571    }
572
573    @Override
574    public Optional<ESHintQueryBuilder> getHintByOperator(String name) {
575        return esa.getHintByOperator(name);
576    }
577}