001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020package org.nuxeo.elasticsearch;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY;
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.Executors;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicInteger;
039
040import javax.transaction.Transaction;
041
042import org.apache.commons.logging.Log;
043import org.apache.commons.logging.LogFactory;
044import org.elasticsearch.common.bytes.BytesReference;
045import org.elasticsearch.index.query.QueryBuilder;
046import org.nuxeo.ecm.core.api.CoreSession;
047import org.nuxeo.ecm.core.api.DocumentModel;
048import org.nuxeo.ecm.core.api.DocumentModelList;
049import org.nuxeo.ecm.core.api.NuxeoException;
050import org.nuxeo.ecm.core.api.SortInfo;
051import org.nuxeo.ecm.core.repository.RepositoryService;
052import org.nuxeo.ecm.core.work.api.Work;
053import org.nuxeo.ecm.core.work.api.WorkManager;
054import org.nuxeo.elasticsearch.api.ESClient;
055import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
056import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
057import org.nuxeo.elasticsearch.api.ElasticSearchService;
058import org.nuxeo.elasticsearch.api.EsResult;
059import org.nuxeo.elasticsearch.api.EsScrollResult;
060import org.nuxeo.elasticsearch.commands.IndexingCommand;
061import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
062import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor;
063import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig;
064import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
065import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl;
066import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl;
067import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl;
068import org.nuxeo.elasticsearch.io.JsonESDocumentWriter;
069import org.nuxeo.elasticsearch.query.NxQueryBuilder;
070import org.nuxeo.elasticsearch.work.IndexingWorker;
071import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker;
072import org.nuxeo.runtime.api.Framework;
073import org.nuxeo.runtime.model.ComponentContext;
074import org.nuxeo.runtime.model.ComponentInstance;
075import org.nuxeo.runtime.model.DefaultComponent;
076import org.nuxeo.runtime.transaction.TransactionHelper;
077
078import com.google.common.util.concurrent.ListenableFuture;
079import com.google.common.util.concurrent.ListeningExecutorService;
080import com.google.common.util.concurrent.MoreExecutors;
081
082/**
083 * Component used to configure and manage ElasticSearch integration
084 */
085public class ElasticSearchComponent extends DefaultComponent
086        implements ElasticSearchAdmin, ElasticSearchIndexing, ElasticSearchService {
087
088    protected static final Log log = LogFactory.getLog(ElasticSearchComponent.class);
089
090    protected static final String EP_EMBEDDED_SERVER = "elasticSearchEmbeddedServer";
091
092    protected static final String EP_CLIENT_INIT = "elasticSearchClient";
093
094    protected static final String EP_INDEX = "elasticSearchIndex";
095
096    protected static final String EP_DOC_WRITER = "elasticSearchDocWriter";
097
098    protected static final long REINDEX_TIMEOUT = 20;
099
100    // Indexing commands that where received before the index initialization
101    protected final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>());
102
103    protected final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>();
104
105    protected final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0);
106
107    protected ElasticSearchEmbeddedServerConfig embeddedServerConfig;
108
109    protected ElasticSearchClientConfig clientConfig;
110
111    protected ElasticSearchAdminImpl esa;
112
113    protected ElasticSearchIndexingImpl esi;
114
115    protected ElasticSearchServiceImpl ess;
116
117    protected JsonESDocumentWriter jsonESDocumentWriter;
118
119    protected ListeningExecutorService waiterExecutorService;
120
121    // Nuxeo Component impl ======================================é=============
122    @Override
123    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
124        switch (extensionPoint) {
125        case EP_EMBEDDED_SERVER:
126            ElasticSearchEmbeddedServerConfig serverContrib = (ElasticSearchEmbeddedServerConfig) contribution;
127            if (serverContrib.isEnabled()) {
128                embeddedServerConfig = serverContrib;
129                log.info("Registering embedded server configuration: " + embeddedServerConfig + ", loaded from "
130                        + contributor.getName());
131            } else if (embeddedServerConfig != null) {
132                log.info("Disabling previous embedded server configuration, deactivated by " + contributor.getName());
133                embeddedServerConfig = null;
134            }
135            break;
136        case EP_CLIENT_INIT:
137            clientConfig = (ElasticSearchClientConfig) contribution;
138            break;
139        case EP_INDEX:
140            ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution;
141            ElasticSearchIndexConfig previous = indexConfig.get(idx.getName());
142            if (idx.isEnabled()) {
143                idx.merge(previous);
144                indexConfig.put(idx.getName(), idx);
145                log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName());
146            } else if (previous != null) {
147                log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName());
148                indexConfig.remove(idx.getName());
149            }
150            break;
151        case EP_DOC_WRITER:
152            ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution;
153            try {
154                jsonESDocumentWriter = writerDescriptor.getKlass().newInstance();
155            } catch (IllegalAccessException | InstantiationException e) {
156                log.error("Cannot instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass());
157                throw new NuxeoException(e);
158            }
159            break;
160        default:
161            throw new IllegalStateException("Invalid EP: " + extensionPoint);
162        }
163    }
164
165    @Override
166    public void start(ComponentContext context) {
167        if (!isElasticsearchEnabled()) {
168            log.info("Elasticsearch service is disabled");
169            return;
170        }
171        esa = new ElasticSearchAdminImpl(embeddedServerConfig, clientConfig, indexConfig);
172        esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter);
173        ess = new ElasticSearchServiceImpl(esa);
174        initListenerThreadPool();
175        processStackedCommands();
176        reindexOnStartup();
177    }
178
179    @Override
180    public void stop(ComponentContext context) {
181        if (esa == null) {
182            // Elasticsearch service was disabled
183            return;
184        }
185        try {
186            shutdownListenerThreadPool();
187        } finally {
188            try {
189                esa.disconnect();
190            } finally {
191                esa = null;
192                esi = null;
193                ess = null;
194            }
195        }
196    }
197
198    protected void reindexOnStartup() {
199        boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false"));
200        if (!reindexOnStartup) {
201            return;
202        }
203        for (String repositoryName : esa.getInitializedRepositories()) {
204            log.warn(String.format("Indexing repository: %s on startup", repositoryName));
205            runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
206            try {
207                prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS);
208            } catch (InterruptedException e) {
209                Thread.currentThread().interrupt();
210            } catch (ExecutionException e) {
211                log.error(e.getMessage(), e);
212            } catch (TimeoutException e) {
213                log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background",
214                        repositoryName, REINDEX_TIMEOUT));
215            }
216        }
217    }
218
219    protected boolean isElasticsearchEnabled() {
220        return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true"));
221    }
222
223    @Override
224    public int getApplicationStartedOrder() {
225        RepositoryService component = (RepositoryService) Framework.getRuntime()
226                                                                   .getComponent(
227                                                                           "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
228        return component.getApplicationStartedOrder() / 2;
229    }
230
231    void processStackedCommands() {
232        if (!stackedCommands.isEmpty()) {
233            log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size()));
234            runIndexingWorker(stackedCommands);
235            stackedCommands.clear();
236            log.debug("Done");
237        }
238    }
239
240    // Es Admin ================================================================
241
242    @Override
243    public ESClient getClient() {
244        return esa.getClient();
245    }
246
247    @Override
248    public void initIndexes(boolean dropIfExists) {
249        esa.initIndexes(dropIfExists);
250    }
251
252    @Override
253    public void dropAndInitIndex(String indexName) {
254        esa.dropAndInitIndex(indexName);
255    }
256
257    @Override
258    public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) {
259        esa.dropAndInitRepositoryIndex(repositoryName, syncAlias);
260    }
261
262    @Override
263    public List<String> getRepositoryNames() {
264        return esa.getRepositoryNames();
265    }
266
267    @Override
268    public String getIndexNameForRepository(String repositoryName) {
269        return esa.getIndexNameForRepository(repositoryName);
270    }
271
272    @Override
273    public String getRepositoryForIndex(String indexName) {
274        return esa.getRepositoryForIndex(indexName);
275    }
276
277    @Override
278    public List<String> getIndexNamesForType(String type) {
279        return esa.getIndexNamesForType(type);
280    }
281
282    @Override
283    public String getIndexNameForType(String type) {
284        return esa.getIndexNameForType(type);
285    }
286
287    @Override
288    public String getWriteIndexName(String searchIndexName) {
289        return esa.getWriteIndexName(searchIndexName);
290    }
291
292    @Override
293    public void syncSearchAndWriteAlias(String searchIndexName) {
294        esa.syncSearchAndWriteAlias(searchIndexName);
295    }
296
297    @SuppressWarnings("deprecation")
298    @Override
299    public long getPendingWorkerCount() {
300        WorkManager wm = Framework.getService(WorkManager.class);
301        // api is deprecated for completed work
302        return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED);
303    }
304
305    @SuppressWarnings("deprecation")
306    @Override
307    public long getRunningWorkerCount() {
308        WorkManager wm = Framework.getService(WorkManager.class);
309        // api is deprecated for completed work
310        return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING);
311    }
312
313    @Override
314    public int getTotalCommandProcessed() {
315        return esa.getTotalCommandProcessed();
316    }
317
318    @Override
319    public boolean isEmbedded() {
320        return esa.isEmbedded();
321    }
322
323    @Override
324    public boolean useExternalVersion() {
325        return esa.useExternalVersion();
326    }
327
328    @Override
329    public boolean isIndexingInProgress() {
330        return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0);
331    }
332
333    @Override
334    public ListenableFuture<Boolean> prepareWaitForIndexing() {
335        return waiterExecutorService.submit(() -> {
336            WorkManager wm = Framework.getService(WorkManager.class);
337            boolean completed;
338            do {
339                completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS);
340            } while (!completed);
341            return true;
342        });
343    }
344
345    protected void initListenerThreadPool() {
346        waiterExecutorService = MoreExecutors.listeningDecorator(
347                Executors.newCachedThreadPool(new NamedThreadFactory()));
348    }
349
350    protected void shutdownListenerThreadPool() {
351        try {
352            waiterExecutorService.shutdown();
353        } finally {
354            waiterExecutorService = null;
355        }
356    }
357
358    @Override
359    public void refresh() {
360        esa.refresh();
361    }
362
363    @Override
364    public void refreshRepositoryIndex(String repositoryName) {
365        esa.refreshRepositoryIndex(repositoryName);
366    }
367
368    @Override
369    public void flush() {
370        esa.flush();
371    }
372
373    @Override
374    public void flushRepositoryIndex(String repositoryName) {
375        esa.flushRepositoryIndex(repositoryName);
376    }
377
378    @Override
379    public void optimize() {
380        esa.optimize();
381    }
382
383    @Override
384    public void optimizeRepositoryIndex(String repositoryName) {
385        esa.optimizeRepositoryIndex(repositoryName);
386    }
387
388    @Override
389    public void optimizeIndex(String indexName) {
390        esa.optimizeIndex(indexName);
391    }
392
393    @Override
394    public void indexNonRecursive(IndexingCommand cmd) {
395        indexNonRecursive(Collections.singletonList(cmd));
396    }
397
398    // ES Indexing =============================================================
399
400    @Override
401    public void indexNonRecursive(List<IndexingCommand> cmds) {
402        if (!isReady()) {
403            stackCommands(cmds);
404            return;
405        }
406        if (log.isDebugEnabled()) {
407            log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray()));
408        }
409        esi.indexNonRecursive(cmds);
410    }
411
412    protected void stackCommands(List<IndexingCommand> cmds) {
413        if (log.isDebugEnabled()) {
414            log.debug("Delaying indexing commands: Waiting for Index to be initialized."
415                    + Arrays.toString(cmds.toArray()));
416        }
417        stackedCommands.addAll(cmds);
418    }
419
420    @Override
421    public void runIndexingWorker(List<IndexingCommand> cmds) {
422        if (!isReady()) {
423            stackCommands(cmds);
424            return;
425        }
426        runIndexingWorkerCount.incrementAndGet();
427        try {
428            dispatchWork(cmds);
429        } finally {
430            runIndexingWorkerCount.decrementAndGet();
431        }
432    }
433
434    /**
435     * Dispatch jobs between sync and async worker
436     */
437    protected void dispatchWork(List<IndexingCommand> cmds) {
438        Map<String, List<IndexingCommand>> syncCommands = new HashMap<>();
439        Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>();
440        for (IndexingCommand cmd : cmds) {
441            if (cmd.isSync()) {
442                List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName());
443                if (syncCmds == null) {
444                    syncCmds = new ArrayList<>();
445                }
446                syncCmds.add(cmd);
447                syncCommands.put(cmd.getRepositoryName(), syncCmds);
448            } else {
449                List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName());
450                if (asyncCmds == null) {
451                    asyncCmds = new ArrayList<>();
452                }
453                asyncCmds.add(cmd);
454                asyncCommands.put(cmd.getRepositoryName(), asyncCmds);
455            }
456        }
457        runIndexingSyncWorker(syncCommands);
458        scheduleIndexingAsyncWorker(asyncCommands);
459    }
460
461    protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) {
462        if (asyncCommands.isEmpty()) {
463            return;
464        }
465        WorkManager wm = Framework.getService(WorkManager.class);
466        for (String repositoryName : asyncCommands.keySet()) {
467            IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName));
468            // we are in afterCompletion don't wait for a commit
469            wm.schedule(idxWork, false);
470        }
471    }
472
473    protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) {
474        if (syncCommands.isEmpty()) {
475            return;
476        }
477        Transaction transaction = TransactionHelper.suspendTransaction();
478        try {
479            for (String repositoryName : syncCommands.keySet()) {
480                IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName));
481                idxWork.run();
482            }
483        } finally {
484            if (transaction != null) {
485                TransactionHelper.resumeTransaction(transaction);
486            }
487
488        }
489    }
490
491    @Override
492    public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) {
493        if (nxql == null || nxql.isEmpty()) {
494            throw new IllegalArgumentException("Expecting an NXQL query");
495        }
496        ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql, syncAlias);
497        WorkManager wm = Framework.getService(WorkManager.class);
498        wm.schedule(worker);
499    }
500
501    @Override
502    public void reindexRepository(String repositoryName) {
503        esa.dropAndInitRepositoryIndex(repositoryName, false);
504        runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document", true);
505    }
506
507    @Override
508    public BytesReference source(DocumentModel doc) throws IOException {
509        return esi.source(doc);
510    }
511
512    // ES Search ===============================================================
513    @Override
514    public DocumentModelList query(NxQueryBuilder queryBuilder) {
515        return ess.query(queryBuilder);
516    }
517
518    @Override
519    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
520        return ess.queryAndAggregate(queryBuilder);
521    }
522
523    @Override
524    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
525        return ess.scroll(queryBuilder, keepAlive);
526    }
527
528    @Override
529    public EsScrollResult scroll(EsScrollResult scrollResult) {
530        return ess.scroll(scrollResult);
531    }
532
533    @Override
534    public void clearScroll(EsScrollResult scrollResult) {
535        ess.clearScroll(scrollResult);
536    }
537
538    @Deprecated
539    @Override
540    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
541        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
542        return query(query);
543    }
544
545    @Deprecated
546    @Override
547    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
548            SortInfo... sortInfos) {
549        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder)
550                                                          .limit(limit)
551                                                          .offset(offset)
552                                                          .addSort(sortInfos);
553        return query(query);
554    }
555
556    // misc ====================================================================
557    protected boolean isReady() {
558        return (esa != null) && esa.isReady();
559    }
560
561    protected static class NamedThreadFactory implements ThreadFactory {
562        @SuppressWarnings("NullableProblems")
563        @Override
564        public Thread newThread(Runnable r) {
565            return new Thread(r, "waitForEsIndexing");
566        }
567    }
568
569}