001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020package org.nuxeo.elasticsearch;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY;
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY;
025
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.concurrent.Callable;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.Executors;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicInteger;
039
040import javax.transaction.Transaction;
041
042import org.apache.commons.logging.Log;
043import org.apache.commons.logging.LogFactory;
044import org.elasticsearch.client.Client;
045import org.elasticsearch.index.query.QueryBuilder;
046import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
047import org.nuxeo.ecm.core.api.CoreSession;
048import org.nuxeo.ecm.core.api.DocumentModelList;
049import org.nuxeo.ecm.core.api.SortInfo;
050import org.nuxeo.ecm.core.repository.RepositoryService;
051import org.nuxeo.ecm.core.work.api.Work;
052import org.nuxeo.ecm.core.work.api.WorkManager;
053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
054import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
055import org.nuxeo.elasticsearch.api.ElasticSearchService;
056import org.nuxeo.elasticsearch.api.EsResult;
057import org.nuxeo.elasticsearch.api.EsScrollResult;
058import org.nuxeo.elasticsearch.commands.IndexingCommand;
059import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor;
060import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
061import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
062import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
063import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl;
064import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl;
065import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl;
066import org.nuxeo.elasticsearch.query.NxQueryBuilder;
067import org.nuxeo.elasticsearch.work.IndexingWorker;
068import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker;
069import org.nuxeo.runtime.api.Framework;
070import org.nuxeo.runtime.model.ComponentContext;
071import org.nuxeo.runtime.model.ComponentInstance;
072import org.nuxeo.runtime.model.DefaultComponent;
073import org.nuxeo.runtime.transaction.TransactionHelper;
074
075import com.google.common.util.concurrent.ListenableFuture;
076import com.google.common.util.concurrent.ListeningExecutorService;
077import com.google.common.util.concurrent.MoreExecutors;
078
079/**
080 * Component used to configure and manage ElasticSearch integration
081 */
082public class ElasticSearchComponent extends DefaultComponent implements ElasticSearchAdmin, ElasticSearchIndexing,
083        ElasticSearchService {
084
085    private static final Log log = LogFactory.getLog(ElasticSearchComponent.class);
086
087    private static final String EP_REMOTE = "elasticSearchRemote";
088
089    private static final String EP_LOCAL = "elasticSearchLocal";
090
091    private static final String EP_INDEX = "elasticSearchIndex";
092
093    private static final String EP_DOC_WRITER = "elasticSearchDocWriter";
094
095    private static final long REINDEX_TIMEOUT = 20;
096
097    // Indexing commands that where received before the index initialization
098    private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>());
099
100    private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>();
101
102    private ElasticSearchLocalConfig localConfig;
103
104    private ElasticSearchRemoteConfig remoteConfig;
105
106    private ElasticSearchAdminImpl esa;
107
108    private ElasticSearchIndexingImpl esi;
109
110    private ElasticSearchServiceImpl ess;
111
112    protected JsonESDocumentWriter jsonESDocumentWriter;
113
114    private ListeningExecutorService waiterExecutorService;
115
116    private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0);
117
118    // Nuxeo Component impl ======================================é=============
119    @Override
120    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
121        switch (extensionPoint) {
122        case EP_LOCAL:
123            ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution;
124            if (localContrib.isEnabled()) {
125                localConfig = localContrib;
126                remoteConfig = null;
127                log.info("Registering local embedded configuration: " + localConfig + ", loaded from "
128                        + contributor.getName());
129            } else if (localConfig != null) {
130                log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName());
131                localConfig = null;
132            }
133            break;
134        case EP_REMOTE:
135            ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution;
136            if (remoteContribution.isEnabled()) {
137                remoteConfig = remoteContribution;
138                localConfig = null;
139                log.info("Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName());
140            } else if (remoteConfig != null) {
141                log.info("Disabling previous remote configuration, deactivated by " + contributor.getName());
142                remoteConfig = null;
143            }
144            break;
145        case EP_INDEX:
146            ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution;
147            ElasticSearchIndexConfig previous = indexConfig.get(idx.getName());
148            if (idx.isEnabled()) {
149                idx.merge(previous);
150                indexConfig.put(idx.getName(), idx);
151                log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName());
152            } else if (previous != null) {
153                log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName());
154                indexConfig.remove(idx.getName());
155            }
156            break;
157        case EP_DOC_WRITER:
158            ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution;
159            try {
160                jsonESDocumentWriter = writerDescriptor.getKlass().newInstance();
161            } catch (IllegalAccessException | InstantiationException e) {
162                log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass());
163                throw new RuntimeException(e);
164            }
165            break;
166        default:
167            throw new IllegalStateException("Invalid EP: " + extensionPoint);
168        }
169    }
170
171    @Override
172    public void applicationStarted(ComponentContext context) {
173        if (!isElasticsearchEnabled()) {
174            log.info("Elasticsearch service is disabled");
175            return;
176        }
177        esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig);
178        esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter);
179        ess = new ElasticSearchServiceImpl(esa);
180        initListenerThreadPool();
181        processStackedCommands();
182        reindexOnStartup();
183    }
184
185    private void reindexOnStartup() {
186        boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false"));
187        if (!reindexOnStartup) {
188            return;
189        }
190        for (String repositoryName : esa.getInitializedRepositories()) {
191            log.warn(String.format("Indexing repository: %s on startup", repositoryName));
192            runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
193            try {
194                prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS);
195            } catch (InterruptedException e) {
196                Thread.currentThread().interrupt();
197            } catch (ExecutionException e) {
198                log.error(e.getMessage(), e);
199            } catch (TimeoutException e) {
200                log.warn(String.format("Indexation of repository %s not finised after %d s, continuing in background",
201                        repositoryName, REINDEX_TIMEOUT));
202            }
203        }
204    }
205
206    protected boolean isElasticsearchEnabled() {
207        return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true"));
208    }
209
210    @Override
211    public void deactivate(ComponentContext context) {
212        if (esa != null) {
213            esa.disconnect();
214        }
215    }
216
217    @Override
218    public int getApplicationStartedOrder() {
219        RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent(
220                "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
221        return component.getApplicationStartedOrder() / 2;
222    }
223
224    void processStackedCommands() {
225        if (!stackedCommands.isEmpty()) {
226            log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size()));
227            runIndexingWorker(stackedCommands);
228            stackedCommands.clear();
229            log.debug("Done");
230        }
231    }
232
233    // Es Admin ================================================================
234
235    @Override
236    public Client getClient() {
237        return esa.getClient();
238    }
239
240    @Override
241    public void initIndexes(boolean dropIfExists) {
242        esa.initIndexes(dropIfExists);
243    }
244
245    @Override
246    public void dropAndInitIndex(String indexName) {
247        esa.dropAndInitIndex(indexName);
248    }
249
250    @Override
251    public void dropAndInitRepositoryIndex(String repositoryName) {
252        esa.dropAndInitRepositoryIndex(repositoryName);
253    }
254
255    @Override
256    public List<String> getRepositoryNames() {
257        return esa.getRepositoryNames();
258    }
259
260    @Override
261    public String getIndexNameForRepository(String repositoryName) {
262        return esa.getIndexNameForRepository(repositoryName);
263    }
264
265    @Override
266    public List<String> getIndexNamesForType(String type) {
267        return esa.getIndexNamesForType(type);
268    }
269
270    @Override
271    public String getIndexNameForType(String type) {
272        return esa.getIndexNameForType(type);
273    }
274
275    @Override
276    public long getPendingWorkerCount() {
277        WorkManager wm = Framework.getLocalService(WorkManager.class);
278        return wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED);
279    }
280
281    @Override
282    public long getRunningWorkerCount() {
283        WorkManager wm = Framework.getLocalService(WorkManager.class);
284        return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING);
285    }
286
287    @Override
288    public int getTotalCommandProcessed() {
289        return esa.getTotalCommandProcessed();
290    }
291
292    @Override
293    public boolean isEmbedded() {
294        return esa.isEmbedded();
295    }
296
297    @Override
298    public boolean useExternalVersion() {
299        return esa.useExternalVersion();
300    }
301
302    @Override
303    public boolean isIndexingInProgress() {
304        return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() > 0);
305    }
306
307    @Override
308    public ListenableFuture<Boolean> prepareWaitForIndexing() {
309        return waiterExecutorService.submit(new Callable<Boolean>() {
310            @Override
311            public Boolean call() throws Exception {
312                WorkManager wm = Framework.getLocalService(WorkManager.class);
313                boolean completed = false;
314                do {
315                    completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS);
316                } while (!completed);
317                return true;
318            }
319        });
320    }
321
322    private static class NamedThreadFactory implements ThreadFactory {
323        @SuppressWarnings("NullableProblems")
324        @Override
325        public Thread newThread(Runnable r) {
326            return new Thread(r, "waitForEsIndexing");
327        }
328    }
329
330    protected void initListenerThreadPool() {
331        waiterExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory()));
332    }
333
334    @Override
335    public void refresh() {
336        esa.refresh();
337    }
338
339    @Override
340    public void refreshRepositoryIndex(String repositoryName) {
341        esa.refreshRepositoryIndex(repositoryName);
342    }
343
344    @Override
345    public void flush() {
346        esa.flush();
347    }
348
349    @Override
350    public void flushRepositoryIndex(String repositoryName) {
351        esa.flushRepositoryIndex(repositoryName);
352    }
353
354    @Override
355    public void optimize() {
356        esa.optimize();
357    }
358
359    @Override
360    public void optimizeRepositoryIndex(String repositoryName) {
361        esa.optimizeRepositoryIndex(repositoryName);
362    }
363
364    @Override
365    public void optimizeIndex(String indexName) {
366        esa.optimizeIndex(indexName);
367    }
368
369    // ES Indexing =============================================================
370
371    @Override
372    public void indexNonRecursive(IndexingCommand cmd) {
373        List<IndexingCommand> cmds = new ArrayList<>(1);
374        cmds.add(cmd);
375        indexNonRecursive(cmds);
376    }
377
378    @Override
379    public void indexNonRecursive(List<IndexingCommand> cmds) {
380        if (!isReady()) {
381            stackCommands(cmds);
382            return;
383        }
384        if (log.isDebugEnabled()) {
385            log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray()));
386        }
387        esi.indexNonRecursive(cmds);
388    }
389
390    protected void stackCommands(List<IndexingCommand> cmds) {
391        if (log.isDebugEnabled()) {
392            log.debug("Delaying indexing commands: Waiting for Index to be initialized."
393                    + Arrays.toString(cmds.toArray()));
394        }
395        stackedCommands.addAll(cmds);
396    }
397
398    @Override
399    public void runIndexingWorker(List<IndexingCommand> cmds) {
400        if (!isReady()) {
401            stackCommands(cmds);
402            return;
403        }
404        runIndexingWorkerCount.incrementAndGet();
405        try {
406            dispatchWork(cmds);
407        } finally {
408            runIndexingWorkerCount.decrementAndGet();
409        }
410    }
411
412    /**
413     * Dispatch jobs between sync and async worker
414     */
415    protected void dispatchWork(List<IndexingCommand> cmds) {
416        Map<String, List<IndexingCommand>> syncCommands = new HashMap<>();
417        Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>();
418        for (IndexingCommand cmd : cmds) {
419            if (cmd.isSync()) {
420                List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName());
421                if (syncCmds == null) {
422                    syncCmds = new ArrayList<>();
423                }
424                syncCmds.add(cmd);
425                syncCommands.put(cmd.getRepositoryName(), syncCmds);
426            } else {
427                List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName());
428                if (asyncCmds == null) {
429                    asyncCmds = new ArrayList<>();
430                }
431                asyncCmds.add(cmd);
432                asyncCommands.put(cmd.getRepositoryName(), asyncCmds);
433            }
434        }
435        runIndexingSyncWorker(syncCommands);
436        scheduleIndexingAsyncWorker(asyncCommands);
437    }
438
439    protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) {
440        if (asyncCommands.isEmpty()) {
441            return;
442        }
443        WorkManager wm = Framework.getLocalService(WorkManager.class);
444        for (String repositoryName : asyncCommands.keySet()) {
445            IndexingWorker idxWork = new IndexingWorker(repositoryName, asyncCommands.get(repositoryName));
446            // we are in afterCompletion don't wait for a commit
447            wm.schedule(idxWork, false);
448        }
449    }
450
451    protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) {
452        if (syncCommands.isEmpty()) {
453            return;
454        }
455        Transaction transaction = TransactionHelper.suspendTransaction();
456        try {
457            for (String repositoryName : syncCommands.keySet()) {
458                IndexingWorker idxWork = new IndexingWorker(repositoryName, syncCommands.get(repositoryName));
459                idxWork.run();
460            }
461        } finally {
462            if (transaction != null) {
463                TransactionHelper.resumeTransaction(transaction);
464            }
465
466        }
467    }
468
469    @Override
470    public void runReindexingWorker(String repositoryName, String nxql) {
471        if (nxql == null || nxql.isEmpty()) {
472            throw new IllegalArgumentException("Expecting an NXQL query");
473        }
474        ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql);
475        WorkManager wm = Framework.getLocalService(WorkManager.class);
476        wm.schedule(worker);
477    }
478
479    // ES Search ===============================================================
480    @Override
481    public DocumentModelList query(NxQueryBuilder queryBuilder) {
482        return ess.query(queryBuilder);
483    }
484
485    @Override
486    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
487        return ess.queryAndAggregate(queryBuilder);
488    }
489
490    @Override
491    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
492        return ess.scroll(queryBuilder, keepAlive);
493    }
494
495    @Override
496    public EsScrollResult scroll(EsScrollResult scrollResult) {
497        return ess.scroll(scrollResult);
498    }
499
500    @Override
501    public void clearScroll(EsScrollResult scrollResult) {
502        ess.clearScroll(scrollResult);
503    }
504
505    @Deprecated
506    @Override
507    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
508        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
509        return query(query);
510    }
511
512    @Deprecated
513    @Override
514    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
515            SortInfo... sortInfos) {
516        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder)
517                                                          .limit(limit)
518                                                          .offset(offset)
519                                                          .addSort(sortInfos);
520        return query(query);
521    }
522
523    // misc ====================================================================
524    private boolean isReady() {
525        return (esa != null) && esa.isReady();
526    }
527
528}