001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020package org.nuxeo.elasticsearch;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY;
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY;
025
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.Executors;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.AtomicInteger;
038
039import javax.transaction.Transaction;
040
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.elasticsearch.index.query.QueryBuilder;
044import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
045import org.nuxeo.ecm.core.api.CoreSession;
046import org.nuxeo.ecm.core.api.DocumentModelList;
047import org.nuxeo.ecm.core.api.NuxeoException;
048import org.nuxeo.ecm.core.api.SortInfo;
049import org.nuxeo.ecm.core.repository.RepositoryService;
050import org.nuxeo.ecm.core.work.api.Work;
051import org.nuxeo.ecm.core.work.api.WorkManager;
052import org.nuxeo.elasticsearch.api.ESClient;
053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
054import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
055import org.nuxeo.elasticsearch.api.ElasticSearchService;
056import org.nuxeo.elasticsearch.api.EsResult;
057import org.nuxeo.elasticsearch.api.EsScrollResult;
058import org.nuxeo.elasticsearch.commands.IndexingCommand;
059import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
060import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor;
061import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig;
062import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
063import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl;
064import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl;
065import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl;
066import org.nuxeo.elasticsearch.query.NxQueryBuilder;
067import org.nuxeo.elasticsearch.work.IndexingWorker;
068import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker;
069import org.nuxeo.runtime.api.Framework;
070import org.nuxeo.runtime.model.ComponentContext;
071import org.nuxeo.runtime.model.ComponentInstance;
072import org.nuxeo.runtime.model.DefaultComponent;
073import org.nuxeo.runtime.transaction.TransactionHelper;
074
075import com.google.common.util.concurrent.ListenableFuture;
076import com.google.common.util.concurrent.ListeningExecutorService;
077import com.google.common.util.concurrent.MoreExecutors;
078
079/**
080 * Component used to configure and manage ElasticSearch integration
081 */
082public class ElasticSearchComponent extends DefaultComponent
083        implements ElasticSearchAdmin, ElasticSearchIndexing, ElasticSearchService {
084
085    protected static final Log log = LogFactory.getLog(ElasticSearchComponent.class);
086
087    protected static final String EP_EMBEDDED_SERVER = "elasticSearchEmbeddedServer";
088
089    protected static final String EP_CLIENT_INIT = "elasticSearchClient";
090
091    protected static final String EP_INDEX = "elasticSearchIndex";
092
093    protected static final String EP_DOC_WRITER = "elasticSearchDocWriter";
094
095    protected static final long REINDEX_TIMEOUT = 20;
096
097    // Indexing commands that where received before the index initialization
098    protected final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>());
099
100    protected final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>();
101
102    protected final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0);
103
104    protected ElasticSearchEmbeddedServerConfig embeddedServerConfig;
105
106    protected ElasticSearchClientConfig clientConfig;
107
108    protected ElasticSearchAdminImpl esa;
109
110    protected ElasticSearchIndexingImpl esi;
111
112    protected ElasticSearchServiceImpl ess;
113
114    protected JsonESDocumentWriter jsonESDocumentWriter;
115
116    protected ListeningExecutorService waiterExecutorService;
117
118    // Nuxeo Component impl ======================================é=============
119    @Override
120    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
121        switch (extensionPoint) {
122        case EP_EMBEDDED_SERVER:
123            ElasticSearchEmbeddedServerConfig serverContrib = (ElasticSearchEmbeddedServerConfig) contribution;
124            if (serverContrib.isEnabled()) {
125                embeddedServerConfig = serverContrib;
126                log.info("Registering embedded server configuration: " + embeddedServerConfig + ", loaded from "
127                        + contributor.getName());
128            } else if (embeddedServerConfig != null) {
129                log.info("Disabling previous embedded server configuration, deactivated by " + contributor.getName());
130                embeddedServerConfig = null;
131            }
132            break;
133        case EP_CLIENT_INIT:
134            clientConfig = (ElasticSearchClientConfig) contribution;
135            break;
136        case EP_INDEX:
137            ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution;
138            ElasticSearchIndexConfig previous = indexConfig.get(idx.getName());
139            if (idx.isEnabled()) {
140                idx.merge(previous);
141                indexConfig.put(idx.getName(), idx);
142                log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName());
143            } else if (previous != null) {
144                log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName());
145                indexConfig.remove(idx.getName());
146            }
147            break;
148        case EP_DOC_WRITER:
149            ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution;
150            try {
151                jsonESDocumentWriter = writerDescriptor.getKlass().newInstance();
152            } catch (IllegalAccessException | InstantiationException e) {
153                log.error("Cannot instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass());
154                throw new NuxeoException(e);
155            }
156            break;
157        default:
158            throw new IllegalStateException("Invalid EP: " + extensionPoint);
159        }
160    }
161
162    @Override
163    public void start(ComponentContext context) {
164        if (!isElasticsearchEnabled()) {
165            log.info("Elasticsearch service is disabled");
166            return;
167        }
168        esa = new ElasticSearchAdminImpl(embeddedServerConfig, clientConfig, indexConfig);
169        esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter);
170        ess = new ElasticSearchServiceImpl(esa);
171        initListenerThreadPool();
172        processStackedCommands();
173        reindexOnStartup();
174    }
175
176    @Override
177    public void stop(ComponentContext context) {
178        if (esa == null) {
179            // Elasticsearch service was disabled
180            return;
181        }
182        try {
183            shutdownListenerThreadPool();
184        } finally {
185            try {
186                esa.disconnect();
187            } finally {
188                esa = null;
189                esi = null;
190                ess = null;
191            }
192        }
193    }
194
195    protected void reindexOnStartup() {
196        boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false"));
197        if (!reindexOnStartup) {
198            return;
199        }
200        for (String repositoryName : esa.getInitializedRepositories()) {
201            log.warn(String.format("Indexing repository: %s on startup", repositoryName));
202            runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
203            try {
204                prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS);
205            } catch (InterruptedException e) {
206                Thread.currentThread().interrupt();
207            } catch (ExecutionException e) {
208                log.error(e.getMessage(), e);
209            } catch (TimeoutException e) {
210                log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background",
211                        repositoryName, REINDEX_TIMEOUT));
212            }
213        }
214    }
215
216    protected boolean isElasticsearchEnabled() {
217        return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true"));
218    }
219
220    @Override
221    public int getApplicationStartedOrder() {
222        RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent(
223                "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
224        return component.getApplicationStartedOrder() / 2;
225    }
226
227    void processStackedCommands() {
228        if (!stackedCommands.isEmpty()) {
229            log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size()));
230            runIndexingWorker(stackedCommands);
231            stackedCommands.clear();
232            log.debug("Done");
233        }
234    }
235
236    // Es Admin ================================================================
237
238    @Override
239    public ESClient getClient() {
240        return esa.getClient();
241    }
242
243    @Override
244    public void initIndexes(boolean dropIfExists) {
245        esa.initIndexes(dropIfExists);
246    }
247
248    @Override
249    public void dropAndInitIndex(String indexName) {
250        esa.dropAndInitIndex(indexName);
251    }
252
253    @Override
254    public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) {
255        esa.dropAndInitRepositoryIndex(repositoryName, syncAlias);
256    }
257
258    @Override
259    public List<String> getRepositoryNames() {
260        return esa.getRepositoryNames();
261    }
262
263    @Override
264    public String getIndexNameForRepository(String repositoryName) {
265        return esa.getIndexNameForRepository(repositoryName);
266    }
267
268    @Override
269    public String getRepositoryForIndex(String indexName) {
270        return esa.getRepositoryForIndex(indexName);
271    }
272
273    @Override
274    public List<String> getIndexNamesForType(String type) {
275        return esa.getIndexNamesForType(type);
276    }
277
278    @Override
279    public String getIndexNameForType(String type) {
280        return esa.getIndexNameForType(type);
281    }
282
283    @Override
284    public String getWriteIndexName(String searchIndexName) {
285        return esa.getWriteIndexName(searchIndexName);
286    }
287
288    @Override
289    public void syncSearchAndWriteAlias(String searchIndexName) {
290        esa.syncSearchAndWriteAlias(searchIndexName);
291    }
292
293    @SuppressWarnings("deprecation")
294    @Override
295    public long getPendingWorkerCount() {
296        WorkManager wm = Framework.getService(WorkManager.class);
297        // api is deprecated for completed work
298        return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED);
299    }
300
301    @SuppressWarnings("deprecation")
302    @Override
303    public long getRunningWorkerCount() {
304        WorkManager wm = Framework.getService(WorkManager.class);
305        // api is deprecated for completed work
306        return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING);
307    }
308
309    @Override
310    public int getTotalCommandProcessed() {
311        return esa.getTotalCommandProcessed();
312    }
313
314    @Override
315    public boolean isEmbedded() {
316        return esa.isEmbedded();
317    }
318
319    @Override
320    public boolean useExternalVersion() {
321        return esa.useExternalVersion();
322    }
323
324    @Override
325    public boolean isIndexingInProgress() {
326        return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0);
327    }
328
329    @Override
330    public ListenableFuture<Boolean> prepareWaitForIndexing() {
331        return waiterExecutorService.submit(() -> {
332            WorkManager wm = Framework.getService(WorkManager.class);
333            boolean completed;
334            do {
335                completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS);
336            } while (!completed);
337            return true;
338        });
339    }
340
341    protected void initListenerThreadPool() {
342        waiterExecutorService = MoreExecutors.listeningDecorator(
343                Executors.newCachedThreadPool(new NamedThreadFactory()));
344    }
345
346    protected void shutdownListenerThreadPool() {
347        try {
348            waiterExecutorService.shutdown();
349        } finally {
350            waiterExecutorService = null;
351        }
352    }
353
354    @Override
355    public void refresh() {
356        esa.refresh();
357    }
358
359    @Override
360    public void refreshRepositoryIndex(String repositoryName) {
361        esa.refreshRepositoryIndex(repositoryName);
362    }
363
364    @Override
365    public void flush() {
366        esa.flush();
367    }
368
369    @Override
370    public void flushRepositoryIndex(String repositoryName) {
371        esa.flushRepositoryIndex(repositoryName);
372    }
373
374    @Override
375    public void optimize() {
376        esa.optimize();
377    }
378
379    @Override
380    public void optimizeRepositoryIndex(String repositoryName) {
381        esa.optimizeRepositoryIndex(repositoryName);
382    }
383
384    @Override
385    public void optimizeIndex(String indexName) {
386        esa.optimizeIndex(indexName);
387    }
388
389    @Override
390    public void indexNonRecursive(IndexingCommand cmd) {
391        indexNonRecursive(Collections.singletonList(cmd));
392    }
393
394    // ES Indexing =============================================================
395
396    @Override
397    public void indexNonRecursive(List<IndexingCommand> cmds) {
398        if (!isReady()) {
399            stackCommands(cmds);
400            return;
401        }
402        if (log.isDebugEnabled()) {
403            log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray()));
404        }
405        esi.indexNonRecursive(cmds);
406    }
407
408    protected void stackCommands(List<IndexingCommand> cmds) {
409        if (log.isDebugEnabled()) {
410            log.debug("Delaying indexing commands: Waiting for Index to be initialized."
411                    + Arrays.toString(cmds.toArray()));
412        }
413        stackedCommands.addAll(cmds);
414    }
415
416    @Override
417    public void runIndexingWorker(List<IndexingCommand> cmds) {
418        if (!isReady()) {
419            stackCommands(cmds);
420            return;
421        }
422        runIndexingWorkerCount.incrementAndGet();
423        try {
424            dispatchWork(cmds);
425        } finally {
426            runIndexingWorkerCount.decrementAndGet();
427        }
428    }
429
430    /**
431     * Dispatch jobs between sync and async worker
432     */
433    protected void dispatchWork(List<IndexingCommand> cmds) {
434        Map<String, List<IndexingCommand>> syncCommands = new HashMap<>();
435        Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>();
436        for (IndexingCommand cmd : cmds) {
437            if (cmd.isSync()) {
438                List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName());
439                if (syncCmds == null) {
440                    syncCmds = new ArrayList<>();
441                }
442                syncCmds.add(cmd);
443                syncCommands.put(cmd.getRepositoryName(), syncCmds);
444            } else {
445                List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName());
446                if (asyncCmds == null) {
447                    asyncCmds = new ArrayList<>();
448                }
449                asyncCmds.add(cmd);
450                asyncCommands.put(cmd.getRepositoryName(), asyncCmds);
451            }
452        }
453        runIndexingSyncWorker(syncCommands);
454        scheduleIndexingAsyncWorker(asyncCommands);
455    }
456
457    protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) {
458        if (asyncCommands.isEmpty()) {
459            return;
460        }
461        WorkManager wm = Framework.getService(WorkManager.class);
462        for (String repositoryName : asyncCommands.keySet()) {
463            IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName));
464            // we are in afterCompletion don't wait for a commit
465            wm.schedule(idxWork, false);
466        }
467    }
468
469    protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) {
470        if (syncCommands.isEmpty()) {
471            return;
472        }
473        Transaction transaction = TransactionHelper.suspendTransaction();
474        try {
475            for (String repositoryName : syncCommands.keySet()) {
476                IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName));
477                idxWork.run();
478            }
479        } finally {
480            if (transaction != null) {
481                TransactionHelper.resumeTransaction(transaction);
482            }
483
484        }
485    }
486
487    @Override
488    public void runReindexingWorker(String repositoryName, String nxql, boolean syncAlias) {
489        if (nxql == null || nxql.isEmpty()) {
490            throw new IllegalArgumentException("Expecting an NXQL query");
491        }
492        ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql, syncAlias);
493        WorkManager wm = Framework.getService(WorkManager.class);
494        wm.schedule(worker);
495    }
496
497    @Override
498    public void reindexRepository(String repositoryName) {
499        esa.dropAndInitRepositoryIndex(repositoryName, false);
500        runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document", true);
501    }
502
503    // ES Search ===============================================================
504    @Override
505    public DocumentModelList query(NxQueryBuilder queryBuilder) {
506        return ess.query(queryBuilder);
507    }
508
509    @Override
510    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
511        return ess.queryAndAggregate(queryBuilder);
512    }
513
514    @Override
515    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
516        return ess.scroll(queryBuilder, keepAlive);
517    }
518
519    @Override
520    public EsScrollResult scroll(EsScrollResult scrollResult) {
521        return ess.scroll(scrollResult);
522    }
523
524    @Override
525    public void clearScroll(EsScrollResult scrollResult) {
526        ess.clearScroll(scrollResult);
527    }
528
529    @Deprecated
530    @Override
531    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
532        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
533        return query(query);
534    }
535
536    @Deprecated
537    @Override
538    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
539            SortInfo... sortInfos) {
540        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort(
541                sortInfos);
542        return query(query);
543    }
544
545    // misc ====================================================================
546    protected boolean isReady() {
547        return (esa != null) && esa.isReady();
548    }
549
550    protected static class NamedThreadFactory implements ThreadFactory {
551        @SuppressWarnings("NullableProblems")
552        @Override
553        public Thread newThread(Runnable r) {
554            return new Thread(r, "waitForEsIndexing");
555        }
556    }
557
558}