001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020package org.nuxeo.elasticsearch;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY;
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY;
025
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.Executors;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.AtomicInteger;
038
039import javax.transaction.Transaction;
040
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.elasticsearch.client.Client;
044import org.elasticsearch.index.query.QueryBuilder;
045import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
046import org.nuxeo.ecm.core.api.CoreSession;
047import org.nuxeo.ecm.core.api.DocumentModelList;
048import org.nuxeo.ecm.core.api.SortInfo;
049import org.nuxeo.ecm.core.repository.RepositoryService;
050import org.nuxeo.ecm.core.work.api.Work;
051import org.nuxeo.ecm.core.work.api.WorkManager;
052import org.nuxeo.elasticsearch.api.ESClientInitializationService;
053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
054import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
055import org.nuxeo.elasticsearch.api.ElasticSearchService;
056import org.nuxeo.elasticsearch.api.EsResult;
057import org.nuxeo.elasticsearch.api.EsScrollResult;
058import org.nuxeo.elasticsearch.commands.IndexingCommand;
059import org.nuxeo.elasticsearch.config.ESClientInitializationDescriptor;
060import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor;
061import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
062import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
063import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
064import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl;
065import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl;
066import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl;
067import org.nuxeo.elasticsearch.query.NxQueryBuilder;
068import org.nuxeo.elasticsearch.work.IndexingWorker;
069import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker;
070import org.nuxeo.runtime.api.Framework;
071import org.nuxeo.runtime.model.ComponentContext;
072import org.nuxeo.runtime.model.ComponentInstance;
073import org.nuxeo.runtime.model.DefaultComponent;
074import org.nuxeo.runtime.transaction.TransactionHelper;
075
076import com.google.common.util.concurrent.ListenableFuture;
077import com.google.common.util.concurrent.ListeningExecutorService;
078import com.google.common.util.concurrent.MoreExecutors;
079
080/**
081 * Component used to configure and manage ElasticSearch integration
082 */
083public class ElasticSearchComponent extends DefaultComponent
084        implements ElasticSearchAdmin, ElasticSearchIndexing, ElasticSearchService {
085
086    private static final Log log = LogFactory.getLog(ElasticSearchComponent.class);
087
088    private static final String EP_REMOTE = "elasticSearchRemote";
089
090    private static final String EP_LOCAL = "elasticSearchLocal";
091
092    private static final String EP_INDEX = "elasticSearchIndex";
093
094    private static final String EP_DOC_WRITER = "elasticSearchDocWriter";
095
096    private static final String EP_CLIENT_INIT = "elasticSearchClientInitialization";
097
098    private static final long REINDEX_TIMEOUT = 20;
099
100    // Indexing commands that where received before the index initialization
101    private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>());
102
103    private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>();
104
105    private ElasticSearchLocalConfig localConfig;
106
107    private ElasticSearchRemoteConfig remoteConfig;
108
109    private ElasticSearchAdminImpl esa;
110
111    private ElasticSearchIndexingImpl esi;
112
113    private ElasticSearchServiceImpl ess;
114
115    protected JsonESDocumentWriter jsonESDocumentWriter;
116
117    protected ESClientInitializationService clientInitService;
118
119    private ListeningExecutorService waiterExecutorService;
120
121    private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0);
122
123    // Nuxeo Component impl ======================================é=============
124    @Override
125    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
126        switch (extensionPoint) {
127        case EP_LOCAL:
128            ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution;
129            if (localContrib.isEnabled()) {
130                localConfig = localContrib;
131                remoteConfig = null;
132                log.info("Registering local embedded configuration: " + localConfig + ", loaded from "
133                        + contributor.getName());
134            } else if (localConfig != null) {
135                log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName());
136                localConfig = null;
137            }
138            break;
139        case EP_REMOTE:
140            ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution;
141            if (remoteContribution.isEnabled()) {
142                remoteConfig = remoteContribution;
143                localConfig = null;
144                log.info(
145                        "Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName());
146            } else if (remoteConfig != null) {
147                log.info("Disabling previous remote configuration, deactivated by " + contributor.getName());
148                remoteConfig = null;
149            }
150            break;
151        case EP_INDEX:
152            ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution;
153            ElasticSearchIndexConfig previous = indexConfig.get(idx.getName());
154            if (idx.isEnabled()) {
155                idx.merge(previous);
156                indexConfig.put(idx.getName(), idx);
157                log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName());
158            } else if (previous != null) {
159                log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName());
160                indexConfig.remove(idx.getName());
161            }
162            break;
163        case EP_DOC_WRITER:
164            ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution;
165            try {
166                jsonESDocumentWriter = writerDescriptor.getKlass().newInstance();
167            } catch (IllegalAccessException | InstantiationException e) {
168                log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass());
169                throw new RuntimeException(e);
170            }
171            break;
172        case EP_CLIENT_INIT:
173            ESClientInitializationDescriptor clientInitDescriptor = (ESClientInitializationDescriptor) contribution;
174            try {
175                clientInitService = clientInitDescriptor.getKlass().newInstance();
176                clientInitService.setUsername(clientInitDescriptor.getUsername());
177                clientInitService.setPassword(clientInitDescriptor.getPassword());
178                clientInitService.setSslKeystorePath(clientInitDescriptor.getSslKeystorePath());
179                clientInitService.setSslKeystorePassword(clientInitDescriptor.getSslKeystorePassword());
180            } catch (IllegalAccessException | InstantiationException e) {
181                log.error(
182                        "Can not instantiate ES Client initialization service from " + clientInitDescriptor.getKlass());
183                throw new RuntimeException(e);
184            }
185            break;
186        default:
187            throw new IllegalStateException("Invalid EP: " + extensionPoint);
188        }
189    }
190
191    @Override
192    public void start(ComponentContext context) {
193        if (!isElasticsearchEnabled()) {
194            log.info("Elasticsearch service is disabled");
195            return;
196        }
197        esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig, clientInitService);
198        esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter);
199        ess = new ElasticSearchServiceImpl(esa);
200        initListenerThreadPool();
201        processStackedCommands();
202        reindexOnStartup();
203    }
204
205    @Override
206    public void stop(ComponentContext context) {
207        try {
208            shutdownListenerThreadPool();
209        } finally {
210            try {
211                esa.disconnect();
212            } finally {
213                esa = null;
214                esi = null;
215                ess = null;
216            }
217        }
218    }
219
220    private void reindexOnStartup() {
221        boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false"));
222        if (!reindexOnStartup) {
223            return;
224        }
225        for (String repositoryName : esa.getInitializedRepositories()) {
226            log.warn(String.format("Indexing repository: %s on startup", repositoryName));
227            runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
228            try {
229                prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS);
230            } catch (InterruptedException e) {
231                Thread.currentThread().interrupt();
232            } catch (ExecutionException e) {
233                log.error(e.getMessage(), e);
234            } catch (TimeoutException e) {
235                log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background",
236                        repositoryName, REINDEX_TIMEOUT));
237            }
238        }
239    }
240
241    protected boolean isElasticsearchEnabled() {
242        return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true"));
243    }
244
245    @Override
246    public int getApplicationStartedOrder() {
247        RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent(
248                "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
249        return component.getApplicationStartedOrder() / 2;
250    }
251
252    void processStackedCommands() {
253        if (!stackedCommands.isEmpty()) {
254            log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size()));
255            runIndexingWorker(stackedCommands);
256            stackedCommands.clear();
257            log.debug("Done");
258        }
259    }
260
261    // Es Admin ================================================================
262
263    @Override
264    public Client getClient() {
265        return esa.getClient();
266    }
267
268    @Override
269    public void initIndexes(boolean dropIfExists) {
270        esa.initIndexes(dropIfExists);
271    }
272
273    @Override
274    public void dropAndInitIndex(String indexName) {
275        esa.dropAndInitIndex(indexName);
276    }
277
278    @Override
279    public void dropAndInitRepositoryIndex(String repositoryName) {
280        esa.dropAndInitRepositoryIndex(repositoryName);
281    }
282
283    @Override
284    public List<String> getRepositoryNames() {
285        return esa.getRepositoryNames();
286    }
287
288    @Override
289    public String getIndexNameForRepository(String repositoryName) {
290        return esa.getIndexNameForRepository(repositoryName);
291    }
292
293    @Override
294    public List<String> getIndexNamesForType(String type) {
295        return esa.getIndexNamesForType(type);
296    }
297
298    @Override
299    public String getIndexNameForType(String type) {
300        return esa.getIndexNameForType(type);
301    }
302
303    @SuppressWarnings("deprecation")
304    @Override
305    public long getPendingWorkerCount() {
306        WorkManager wm = Framework.getLocalService(WorkManager.class);
307        // api is deprecated for completed work
308        return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED);
309    }
310
311    @SuppressWarnings("deprecation")
312    @Override
313    public long getRunningWorkerCount() {
314        WorkManager wm = Framework.getLocalService(WorkManager.class);
315        // api is deprecated for completed work
316        return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING);
317    }
318
319    @Override
320    public int getTotalCommandProcessed() {
321        return esa.getTotalCommandProcessed();
322    }
323
324    @Override
325    public boolean isEmbedded() {
326        return esa.isEmbedded();
327    }
328
329    @Override
330    public boolean useExternalVersion() {
331        return esa.useExternalVersion();
332    }
333
334    @Override
335    public boolean isIndexingInProgress() {
336        return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0);
337    }
338
339    @Override
340    public ListenableFuture<Boolean> prepareWaitForIndexing() {
341        return waiterExecutorService.submit(() -> {
342            WorkManager wm = Framework.getLocalService(WorkManager.class);
343            boolean completed = false;
344            do {
345                completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS);
346            } while (!completed);
347            return true;
348        });
349    }
350
351    private static class NamedThreadFactory implements ThreadFactory {
352        @SuppressWarnings("NullableProblems")
353        @Override
354        public Thread newThread(Runnable r) {
355            return new Thread(r, "waitForEsIndexing");
356        }
357    }
358
359    protected void initListenerThreadPool() {
360        waiterExecutorService = MoreExecutors.listeningDecorator(
361                Executors.newCachedThreadPool(new NamedThreadFactory()));
362    }
363
364    protected void shutdownListenerThreadPool() {
365        try {
366            waiterExecutorService.shutdown();
367        } finally {
368            waiterExecutorService = null;
369        }
370    }
371
372    @Override
373    public void refresh() {
374        esa.refresh();
375    }
376
377    @Override
378    public void refreshRepositoryIndex(String repositoryName) {
379        esa.refreshRepositoryIndex(repositoryName);
380    }
381
382    @Override
383    public void flush() {
384        esa.flush();
385    }
386
387    @Override
388    public void flushRepositoryIndex(String repositoryName) {
389        esa.flushRepositoryIndex(repositoryName);
390    }
391
392    @Override
393    public void optimize() {
394        esa.optimize();
395    }
396
397    @Override
398    public void optimizeRepositoryIndex(String repositoryName) {
399        esa.optimizeRepositoryIndex(repositoryName);
400    }
401
402    @Override
403    public void optimizeIndex(String indexName) {
404        esa.optimizeIndex(indexName);
405    }
406
407    // ES Indexing =============================================================
408
409    @Override
410    public void indexNonRecursive(IndexingCommand cmd) {
411        indexNonRecursive(Collections.singletonList(cmd));
412    }
413
414    @Override
415    public void indexNonRecursive(List<IndexingCommand> cmds) {
416        if (!isReady()) {
417            stackCommands(cmds);
418            return;
419        }
420        if (log.isDebugEnabled()) {
421            log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray()));
422        }
423        esi.indexNonRecursive(cmds);
424    }
425
426    protected void stackCommands(List<IndexingCommand> cmds) {
427        if (log.isDebugEnabled()) {
428            log.debug("Delaying indexing commands: Waiting for Index to be initialized."
429                    + Arrays.toString(cmds.toArray()));
430        }
431        stackedCommands.addAll(cmds);
432    }
433
434    @Override
435    public void runIndexingWorker(List<IndexingCommand> cmds) {
436        if (!isReady()) {
437            stackCommands(cmds);
438            return;
439        }
440        runIndexingWorkerCount.incrementAndGet();
441        try {
442            dispatchWork(cmds);
443        } finally {
444            runIndexingWorkerCount.decrementAndGet();
445        }
446    }
447
448    /**
449     * Dispatch jobs between sync and async worker
450     */
451    protected void dispatchWork(List<IndexingCommand> cmds) {
452        Map<String, List<IndexingCommand>> syncCommands = new HashMap<>();
453        Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>();
454        for (IndexingCommand cmd : cmds) {
455            if (cmd.isSync()) {
456                List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName());
457                if (syncCmds == null) {
458                    syncCmds = new ArrayList<>();
459                }
460                syncCmds.add(cmd);
461                syncCommands.put(cmd.getRepositoryName(), syncCmds);
462            } else {
463                List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName());
464                if (asyncCmds == null) {
465                    asyncCmds = new ArrayList<>();
466                }
467                asyncCmds.add(cmd);
468                asyncCommands.put(cmd.getRepositoryName(), asyncCmds);
469            }
470        }
471        runIndexingSyncWorker(syncCommands);
472        scheduleIndexingAsyncWorker(asyncCommands);
473    }
474
475    protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) {
476        if (asyncCommands.isEmpty()) {
477            return;
478        }
479        WorkManager wm = Framework.getLocalService(WorkManager.class);
480        for (String repositoryName : asyncCommands.keySet()) {
481            IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName));
482            // we are in afterCompletion don't wait for a commit
483            wm.schedule(idxWork, false);
484        }
485    }
486
487    protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) {
488        if (syncCommands.isEmpty()) {
489            return;
490        }
491        Transaction transaction = TransactionHelper.suspendTransaction();
492        try {
493            for (String repositoryName : syncCommands.keySet()) {
494                IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName));
495                idxWork.run();
496            }
497        } finally {
498            if (transaction != null) {
499                TransactionHelper.resumeTransaction(transaction);
500            }
501
502        }
503    }
504
505    @Override
506    public void runReindexingWorker(String repositoryName, String nxql) {
507        if (nxql == null || nxql.isEmpty()) {
508            throw new IllegalArgumentException("Expecting an NXQL query");
509        }
510        ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql);
511        WorkManager wm = Framework.getLocalService(WorkManager.class);
512        wm.schedule(worker);
513    }
514
515    // ES Search ===============================================================
516    @Override
517    public DocumentModelList query(NxQueryBuilder queryBuilder) {
518        return ess.query(queryBuilder);
519    }
520
521    @Override
522    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
523        return ess.queryAndAggregate(queryBuilder);
524    }
525
526    @Override
527    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
528        return ess.scroll(queryBuilder, keepAlive);
529    }
530
531    @Override
532    public EsScrollResult scroll(EsScrollResult scrollResult) {
533        return ess.scroll(scrollResult);
534    }
535
536    @Override
537    public void clearScroll(EsScrollResult scrollResult) {
538        ess.clearScroll(scrollResult);
539    }
540
541    @Deprecated
542    @Override
543    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
544        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
545        return query(query);
546    }
547
548    @Deprecated
549    @Override
550    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
551            SortInfo... sortInfos) {
552        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder)
553                                                          .limit(limit)
554                                                          .offset(offset)
555                                                          .addSort(sortInfos);
556        return query(query);
557    }
558
559    // misc ====================================================================
560    private boolean isReady() {
561        return (esa != null) && esa.isReady();
562    }
563
564}