001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020package org.nuxeo.elasticsearch;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY;
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY;
025
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.concurrent.Callable;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.Executors;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicInteger;
039
040import javax.transaction.Transaction;
041
042import org.apache.commons.logging.Log;
043import org.apache.commons.logging.LogFactory;
044import org.elasticsearch.client.Client;
045import org.elasticsearch.index.query.QueryBuilder;
046import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
047import org.nuxeo.ecm.core.api.CoreSession;
048import org.nuxeo.ecm.core.api.DocumentModelList;
049import org.nuxeo.ecm.core.api.SortInfo;
050import org.nuxeo.ecm.core.repository.RepositoryService;
051import org.nuxeo.ecm.core.work.api.Work;
052import org.nuxeo.ecm.core.work.api.WorkManager;
053import org.nuxeo.elasticsearch.api.ESClientInitializationService;
054import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
055import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
056import org.nuxeo.elasticsearch.api.ElasticSearchService;
057import org.nuxeo.elasticsearch.api.EsResult;
058import org.nuxeo.elasticsearch.api.EsScrollResult;
059import org.nuxeo.elasticsearch.commands.IndexingCommand;
060import org.nuxeo.elasticsearch.config.ESClientInitializationDescriptor;
061import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor;
062import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
063import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
064import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
065import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl;
066import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl;
067import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl;
068import org.nuxeo.elasticsearch.query.NxQueryBuilder;
069import org.nuxeo.elasticsearch.work.IndexingWorker;
070import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker;
071import org.nuxeo.runtime.api.Framework;
072import org.nuxeo.runtime.model.ComponentContext;
073import org.nuxeo.runtime.model.ComponentInstance;
074import org.nuxeo.runtime.model.DefaultComponent;
075import org.nuxeo.runtime.transaction.TransactionHelper;
076
077import com.google.common.util.concurrent.ListenableFuture;
078import com.google.common.util.concurrent.ListeningExecutorService;
079import com.google.common.util.concurrent.MoreExecutors;
080
081/**
082 * Component used to configure and manage ElasticSearch integration
083 */
084public class ElasticSearchComponent extends DefaultComponent implements ElasticSearchAdmin, ElasticSearchIndexing,
085        ElasticSearchService {
086
087    private static final Log log = LogFactory.getLog(ElasticSearchComponent.class);
088
089    private static final String EP_REMOTE = "elasticSearchRemote";
090
091    private static final String EP_LOCAL = "elasticSearchLocal";
092
093    private static final String EP_INDEX = "elasticSearchIndex";
094
095    private static final String EP_DOC_WRITER = "elasticSearchDocWriter";
096
097    private static final String EP_CLIENT_INIT = "elasticSearchClientInitialization";
098
099    private static final long REINDEX_TIMEOUT = 20;
100
101    // Indexing commands that where received before the index initialization
102    private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>());
103
104    private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>();
105
106    private ElasticSearchLocalConfig localConfig;
107
108    private ElasticSearchRemoteConfig remoteConfig;
109
110    private ElasticSearchAdminImpl esa;
111
112    private ElasticSearchIndexingImpl esi;
113
114    private ElasticSearchServiceImpl ess;
115
116    protected JsonESDocumentWriter jsonESDocumentWriter;
117
118    protected ESClientInitializationService clientInitService;
119
120    private ListeningExecutorService waiterExecutorService;
121
122    private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0);
123
124    // Nuxeo Component impl ======================================é=============
125    @Override
126    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
127        switch (extensionPoint) {
128        case EP_LOCAL:
129            ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution;
130            if (localContrib.isEnabled()) {
131                localConfig = localContrib;
132                remoteConfig = null;
133                log.info("Registering local embedded configuration: " + localConfig + ", loaded from "
134                        + contributor.getName());
135            } else if (localConfig != null) {
136                log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName());
137                localConfig = null;
138            }
139            break;
140        case EP_REMOTE:
141            ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution;
142            if (remoteContribution.isEnabled()) {
143                remoteConfig = remoteContribution;
144                localConfig = null;
145                log.info(
146                        "Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName());
147            } else if (remoteConfig != null) {
148                log.info("Disabling previous remote configuration, deactivated by " + contributor.getName());
149                remoteConfig = null;
150            }
151            break;
152        case EP_INDEX:
153            ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution;
154            ElasticSearchIndexConfig previous = indexConfig.get(idx.getName());
155            if (idx.isEnabled()) {
156                idx.merge(previous);
157                indexConfig.put(idx.getName(), idx);
158                log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName());
159            } else if (previous != null) {
160                log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName());
161                indexConfig.remove(idx.getName());
162            }
163            break;
164        case EP_DOC_WRITER:
165            ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution;
166            try {
167                jsonESDocumentWriter = writerDescriptor.getKlass().newInstance();
168            } catch (IllegalAccessException | InstantiationException e) {
169                log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass());
170                throw new RuntimeException(e);
171            }
172            break;
173        case EP_CLIENT_INIT:
174            ESClientInitializationDescriptor clientInitDescriptor = (ESClientInitializationDescriptor) contribution;
175            try {
176                clientInitService = clientInitDescriptor.getKlass().newInstance();
177                clientInitService.setUsername(clientInitDescriptor.getUsername());
178                clientInitService.setPassword(clientInitDescriptor.getPassword());
179            } catch (IllegalAccessException | InstantiationException e) {
180                log.error(
181                        "Can not instantiate ES Client initialization service from " + clientInitDescriptor.getKlass());
182                throw new RuntimeException(e);
183            }
184            break;
185        default:
186            throw new IllegalStateException("Invalid EP: " + extensionPoint);
187        }
188    }
189
190    @Override
191    public void applicationStarted(ComponentContext context) {
192        if (!isElasticsearchEnabled()) {
193            log.info("Elasticsearch service is disabled");
194            return;
195        }
196        esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig, clientInitService);
197        esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter);
198        ess = new ElasticSearchServiceImpl(esa);
199        initListenerThreadPool();
200        processStackedCommands();
201        reindexOnStartup();
202    }
203
204    private void reindexOnStartup() {
205        boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false"));
206        if (!reindexOnStartup) {
207            return;
208        }
209        for (String repositoryName : esa.getInitializedRepositories()) {
210            log.warn(String.format("Indexing repository: %s on startup", repositoryName));
211            runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
212            try {
213                prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS);
214            } catch (InterruptedException e) {
215                Thread.currentThread().interrupt();
216            } catch (ExecutionException e) {
217                log.error(e.getMessage(), e);
218            } catch (TimeoutException e) {
219                log.warn(String.format("Indexation of repository %s not finished after %d s, continuing in background",
220                        repositoryName, REINDEX_TIMEOUT));
221            }
222        }
223    }
224
225    protected boolean isElasticsearchEnabled() {
226        return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true"));
227    }
228
229    @Override
230    public void deactivate(ComponentContext context) {
231        if (esa != null) {
232            esa.disconnect();
233        }
234    }
235
236    @Override
237    public int getApplicationStartedOrder() {
238        RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent(
239                "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
240        return component.getApplicationStartedOrder() / 2;
241    }
242
243    void processStackedCommands() {
244        if (!stackedCommands.isEmpty()) {
245            log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size()));
246            runIndexingWorker(stackedCommands);
247            stackedCommands.clear();
248            log.debug("Done");
249        }
250    }
251
252    // Es Admin ================================================================
253
254    @Override
255    public Client getClient() {
256        return esa.getClient();
257    }
258
259    @Override
260    public void initIndexes(boolean dropIfExists) {
261        esa.initIndexes(dropIfExists);
262    }
263
264    @Override
265    public void dropAndInitIndex(String indexName) {
266        esa.dropAndInitIndex(indexName);
267    }
268
269    @Override
270    public void dropAndInitRepositoryIndex(String repositoryName) {
271        esa.dropAndInitRepositoryIndex(repositoryName);
272    }
273
274    @Override
275    public List<String> getRepositoryNames() {
276        return esa.getRepositoryNames();
277    }
278
279    @Override
280    public String getIndexNameForRepository(String repositoryName) {
281        return esa.getIndexNameForRepository(repositoryName);
282    }
283
284    @Override
285    public List<String> getIndexNamesForType(String type) {
286        return esa.getIndexNamesForType(type);
287    }
288
289    @Override
290    public String getIndexNameForType(String type) {
291        return esa.getIndexNameForType(type);
292    }
293
294    @SuppressWarnings("deprecation")
295    @Override
296    public long getPendingWorkerCount() {
297        WorkManager wm = Framework.getLocalService(WorkManager.class);
298        // api is deprecated for completed work
299        return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED);
300    }
301
302    @SuppressWarnings("deprecation")
303    @Override
304    public long getRunningWorkerCount() {
305        WorkManager wm = Framework.getLocalService(WorkManager.class);
306        // api is deprecated for completed work
307        return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING);
308    }
309
310    @Override
311    public int getTotalCommandProcessed() {
312        return esa.getTotalCommandProcessed();
313    }
314
315    @Override
316    public boolean isEmbedded() {
317        return esa.isEmbedded();
318    }
319
320    @Override
321    public boolean useExternalVersion() {
322        return esa.useExternalVersion();
323    }
324
325    @Override
326    public boolean isIndexingInProgress() {
327        return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0);
328    }
329
330    @Override
331    public ListenableFuture<Boolean> prepareWaitForIndexing() {
332        return waiterExecutorService.submit(new Callable<Boolean>() {
333            @Override
334            public Boolean call() throws Exception {
335                WorkManager wm = Framework.getLocalService(WorkManager.class);
336                boolean completed = false;
337                do {
338                    completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS);
339                } while (!completed);
340                return true;
341            }
342        });
343    }
344
345    private static class NamedThreadFactory implements ThreadFactory {
346        @SuppressWarnings("NullableProblems")
347        @Override
348        public Thread newThread(Runnable r) {
349            return new Thread(r, "waitForEsIndexing");
350        }
351    }
352
353    protected void initListenerThreadPool() {
354        waiterExecutorService = MoreExecutors.listeningDecorator(
355                Executors.newCachedThreadPool(new NamedThreadFactory()));
356    }
357
358    @Override
359    public void refresh() {
360        esa.refresh();
361    }
362
363    @Override
364    public void refreshRepositoryIndex(String repositoryName) {
365        esa.refreshRepositoryIndex(repositoryName);
366    }
367
368    @Override
369    public void flush() {
370        esa.flush();
371    }
372
373    @Override
374    public void flushRepositoryIndex(String repositoryName) {
375        esa.flushRepositoryIndex(repositoryName);
376    }
377
378    @Override
379    public void optimize() {
380        esa.optimize();
381    }
382
383    @Override
384    public void optimizeRepositoryIndex(String repositoryName) {
385        esa.optimizeRepositoryIndex(repositoryName);
386    }
387
388    @Override
389    public void optimizeIndex(String indexName) {
390        esa.optimizeIndex(indexName);
391    }
392
393    // ES Indexing =============================================================
394
395    @Override
396    public void indexNonRecursive(IndexingCommand cmd) {
397        indexNonRecursive(Collections.singletonList(cmd));
398    }
399
400    @Override
401    public void indexNonRecursive(List<IndexingCommand> cmds) {
402        if (!isReady()) {
403            stackCommands(cmds);
404            return;
405        }
406        if (log.isDebugEnabled()) {
407            log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray()));
408        }
409        esi.indexNonRecursive(cmds);
410    }
411
412    protected void stackCommands(List<IndexingCommand> cmds) {
413        if (log.isDebugEnabled()) {
414            log.debug("Delaying indexing commands: Waiting for Index to be initialized."
415                    + Arrays.toString(cmds.toArray()));
416        }
417        stackedCommands.addAll(cmds);
418    }
419
420    @Override
421    public void runIndexingWorker(List<IndexingCommand> cmds) {
422        if (!isReady()) {
423            stackCommands(cmds);
424            return;
425        }
426        runIndexingWorkerCount.incrementAndGet();
427        try {
428            dispatchWork(cmds);
429        } finally {
430            runIndexingWorkerCount.decrementAndGet();
431        }
432    }
433
434    /**
435     * Dispatch jobs between sync and async worker
436     */
437    protected void dispatchWork(List<IndexingCommand> cmds) {
438        Map<String, List<IndexingCommand>> syncCommands = new HashMap<>();
439        Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>();
440        for (IndexingCommand cmd : cmds) {
441            if (cmd.isSync()) {
442                List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName());
443                if (syncCmds == null) {
444                    syncCmds = new ArrayList<>();
445                }
446                syncCmds.add(cmd);
447                syncCommands.put(cmd.getRepositoryName(), syncCmds);
448            } else {
449                List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName());
450                if (asyncCmds == null) {
451                    asyncCmds = new ArrayList<>();
452                }
453                asyncCmds.add(cmd);
454                asyncCommands.put(cmd.getRepositoryName(), asyncCmds);
455            }
456        }
457        runIndexingSyncWorker(syncCommands);
458        scheduleIndexingAsyncWorker(asyncCommands);
459    }
460
461    protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) {
462        if (asyncCommands.isEmpty()) {
463            return;
464        }
465        WorkManager wm = Framework.getLocalService(WorkManager.class);
466        for (String repositoryName : asyncCommands.keySet()) {
467            IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName));
468            // we are in afterCompletion don't wait for a commit
469            wm.schedule(idxWork, false);
470        }
471    }
472
473    protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) {
474        if (syncCommands.isEmpty()) {
475            return;
476        }
477        Transaction transaction = TransactionHelper.suspendTransaction();
478        try {
479            for (String repositoryName : syncCommands.keySet()) {
480                IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName));
481                idxWork.run();
482            }
483        } finally {
484            if (transaction != null) {
485                TransactionHelper.resumeTransaction(transaction);
486            }
487
488        }
489    }
490
491    @Override
492    public void runReindexingWorker(String repositoryName, String nxql) {
493        if (nxql == null || nxql.isEmpty()) {
494            throw new IllegalArgumentException("Expecting an NXQL query");
495        }
496        ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql);
497        WorkManager wm = Framework.getLocalService(WorkManager.class);
498        wm.schedule(worker);
499    }
500
501    // ES Search ===============================================================
502    @Override
503    public DocumentModelList query(NxQueryBuilder queryBuilder) {
504        return ess.query(queryBuilder);
505    }
506
507    @Override
508    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
509        return ess.queryAndAggregate(queryBuilder);
510    }
511
512    @Override
513    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
514        return ess.scroll(queryBuilder, keepAlive);
515    }
516
517    @Override
518    public EsScrollResult scroll(EsScrollResult scrollResult) {
519        return ess.scroll(scrollResult);
520    }
521
522    @Override
523    public void clearScroll(EsScrollResult scrollResult) {
524        ess.clearScroll(scrollResult);
525    }
526
527    @Deprecated
528    @Override
529    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
530        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
531        return query(query);
532    }
533
534    @Deprecated
535    @Override
536    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
537            SortInfo... sortInfos) {
538        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder)
539                                                          .limit(limit)
540                                                          .offset(offset)
541                                                          .addSort(sortInfos);
542        return query(query);
543    }
544
545    // misc ====================================================================
546    private boolean isReady() {
547        return (esa != null) && esa.isReady();
548    }
549
550}