001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Tiry
016 *     bdelbosc
017 */
018package org.nuxeo.elasticsearch;
019
020import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY;
021import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID;
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY;
023
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.concurrent.Callable;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.Executors;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicInteger;
039
040import javax.transaction.Transaction;
041
042import org.apache.commons.logging.Log;
043import org.apache.commons.logging.LogFactory;
044import org.elasticsearch.client.Client;
045import org.elasticsearch.index.query.QueryBuilder;
046import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
047import org.nuxeo.ecm.core.api.CoreSession;
048import org.nuxeo.ecm.core.api.DocumentModelList;
049import org.nuxeo.ecm.core.api.SortInfo;
050import org.nuxeo.ecm.core.repository.RepositoryService;
051import org.nuxeo.ecm.core.work.api.Work;
052import org.nuxeo.ecm.core.work.api.WorkManager;
053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
054import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
055import org.nuxeo.elasticsearch.api.ElasticSearchService;
056import org.nuxeo.elasticsearch.api.EsResult;
057import org.nuxeo.elasticsearch.commands.IndexingCommand;
058import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor;
059import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
060import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
061import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
062import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl;
063import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl;
064import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl;
065import org.nuxeo.elasticsearch.query.NxQueryBuilder;
066import org.nuxeo.elasticsearch.work.IndexingWorker;
067import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker;
068import org.nuxeo.runtime.api.Framework;
069import org.nuxeo.runtime.model.ComponentContext;
070import org.nuxeo.runtime.model.ComponentInstance;
071import org.nuxeo.runtime.model.DefaultComponent;
072import org.nuxeo.runtime.transaction.TransactionHelper;
073
074import com.google.common.util.concurrent.ListenableFuture;
075import com.google.common.util.concurrent.ListeningExecutorService;
076import com.google.common.util.concurrent.MoreExecutors;
077
078/**
079 * Component used to configure and manage ElasticSearch integration
080 */
081public class ElasticSearchComponent extends DefaultComponent implements ElasticSearchAdmin, ElasticSearchIndexing,
082        ElasticSearchService {
083
084    private static final Log log = LogFactory.getLog(ElasticSearchComponent.class);
085
086    private static final String EP_REMOTE = "elasticSearchRemote";
087
088    private static final String EP_LOCAL = "elasticSearchLocal";
089
090    private static final String EP_INDEX = "elasticSearchIndex";
091
092    private static final String EP_DOC_WRITER = "elasticSearchDocWriter";
093
094    private static final long REINDEX_TIMEOUT = 20;
095
096    // Indexing commands that where received before the index initialization
097    private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>());
098
099    private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>();
100
101    private ElasticSearchLocalConfig localConfig;
102
103    private ElasticSearchRemoteConfig remoteConfig;
104
105    private ElasticSearchAdminImpl esa;
106
107    private ElasticSearchIndexingImpl esi;
108
109    private ElasticSearchServiceImpl ess;
110
111    protected JsonESDocumentWriter jsonESDocumentWriter;
112
113    private ListeningExecutorService waiterExecutorService;
114
115    private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0);
116
117    // Nuxeo Component impl ======================================é=============
118    @Override
119    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
120        switch (extensionPoint) {
121        case EP_LOCAL:
122            ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution;
123            if (localContrib.isEnabled()) {
124                localConfig = localContrib;
125                remoteConfig = null;
126                log.info("Registering local embedded configuration: " + localConfig + ", loaded from "
127                        + contributor.getName());
128            } else if (localConfig != null) {
129                log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName());
130                localConfig = null;
131            }
132            break;
133        case EP_REMOTE:
134            ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution;
135            if (remoteContribution.isEnabled()) {
136                remoteConfig = remoteContribution;
137                localConfig = null;
138                log.info("Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName());
139            } else if (remoteConfig != null) {
140                log.info("Disabling previous remote configuration, deactivated by " + contributor.getName());
141                remoteConfig = null;
142            }
143            break;
144        case EP_INDEX:
145            ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution;
146            ElasticSearchIndexConfig previous = indexConfig.get(idx.getName());
147            if (idx.isEnabled()) {
148                idx.merge(previous);
149                indexConfig.put(idx.getName(), idx);
150                log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName());
151            } else if (previous != null) {
152                log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName());
153                indexConfig.remove(idx.getName());
154            }
155            break;
156        case EP_DOC_WRITER:
157            ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution;
158            try {
159                jsonESDocumentWriter = writerDescriptor.getKlass().newInstance();
160            } catch (IllegalAccessException | InstantiationException e) {
161                log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass());
162                throw new RuntimeException(e);
163            }
164            break;
165        default:
166            throw new IllegalStateException("Invalid EP: " + extensionPoint);
167        }
168    }
169
170    @Override
171    public void applicationStarted(ComponentContext context) {
172        if (!isElasticsearchEnabled()) {
173            log.info("Elasticsearch service is disabled");
174            return;
175        }
176        esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig);
177        esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter);
178        ess = new ElasticSearchServiceImpl(esa);
179        initListenerThreadPool();
180        processStackedCommands();
181        reindexOnStartup();
182    }
183
184    private void reindexOnStartup() {
185        boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false"));
186        if (!reindexOnStartup) {
187            return;
188        }
189        for (String repositoryName : esa.getInitializedRepositories()) {
190            log.warn(String.format("Indexing repository: %s on startup", repositoryName));
191            runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
192            try {
193                prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS);
194            } catch (InterruptedException e) {
195                Thread.currentThread().interrupt();
196            } catch (ExecutionException e) {
197                log.error(e.getMessage(), e);
198            } catch (TimeoutException e) {
199                log.warn(String.format("Indexation of repository %s not finised after %d s, continuing in background",
200                        repositoryName, REINDEX_TIMEOUT));
201            }
202        }
203    }
204
205    protected boolean isElasticsearchEnabled() {
206        return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true"));
207    }
208
209    @Override
210    public void deactivate(ComponentContext context) {
211        if (esa != null) {
212            esa.disconnect();
213        }
214    }
215
216    @Override
217    public int getApplicationStartedOrder() {
218        RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent(
219                "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
220        return component.getApplicationStartedOrder() / 2;
221    }
222
223    void processStackedCommands() {
224        if (!stackedCommands.isEmpty()) {
225            log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size()));
226            runIndexingWorker(stackedCommands);
227            stackedCommands.clear();
228            log.debug("Done");
229        }
230    }
231
232    // Es Admin ================================================================
233
234    @Override
235    public Client getClient() {
236        return esa.getClient();
237    }
238
239    @Override
240    public void initIndexes(boolean dropIfExists) {
241        esa.initIndexes(dropIfExists);
242    }
243
244    @Override
245    public void dropAndInitIndex(String indexName) {
246        esa.dropAndInitIndex(indexName);
247    }
248
249    @Override
250    public void dropAndInitRepositoryIndex(String repositoryName) {
251        esa.dropAndInitRepositoryIndex(repositoryName);
252    }
253
254    @Override
255    public List<String> getRepositoryNames() {
256        return esa.getRepositoryNames();
257    }
258
259    @Override
260    public String getIndexNameForRepository(String repositoryName) {
261        return esa.getIndexNameForRepository(repositoryName);
262    }
263
264    @Override
265    public List<String> getIndexNamesForType(String type) {
266        return esa.getIndexNamesForType(type);
267    }
268
269    @Override
270    public String getIndexNameForType(String type) {
271        return esa.getIndexNameForType(type);
272    }
273
274    @Override
275    public int getPendingWorkerCount() {
276        WorkManager wm = Framework.getLocalService(WorkManager.class);
277        return  wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED);
278    }
279
280    @Override
281    public int getRunningWorkerCount() {
282        WorkManager wm = Framework.getLocalService(WorkManager.class);
283        return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING);
284    }
285
286    @Override
287    public int getTotalCommandProcessed() {
288        return esa.getTotalCommandProcessed();
289    }
290
291    @Override
292    public boolean isEmbedded() {
293        return esa.isEmbedded();
294    }
295
296    @Override
297    public boolean isIndexingInProgress() {
298        return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() >  0);
299    }
300
301    @Override
302    public ListenableFuture<Boolean> prepareWaitForIndexing() {
303        return waiterExecutorService.submit(new Callable<Boolean>() {
304            @Override
305            public Boolean call() throws Exception {
306                WorkManager wm = Framework.getLocalService(WorkManager.class);
307                wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS);
308                return true;
309            }
310        });
311    }
312
313    private static class NamedThreadFactory implements ThreadFactory {
314        @SuppressWarnings("NullableProblems")
315        @Override
316        public Thread newThread(Runnable r) {
317            return new Thread(r, "waitForEsIndexing");
318        }
319    }
320
321    protected void initListenerThreadPool() {
322        waiterExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory()));
323    }
324
325    @Override
326    public void refresh() {
327        esa.refresh();
328    }
329
330    @Override
331    public void refreshRepositoryIndex(String repositoryName) {
332        esa.refreshRepositoryIndex(repositoryName);
333    }
334
335    @Override
336    public void flush() {
337        esa.flush();
338    }
339
340    @Override
341    public void flushRepositoryIndex(String repositoryName) {
342        esa.flushRepositoryIndex(repositoryName);
343    }
344
345    @Override
346    public void optimize() {
347        esa.optimize();
348    }
349
350    @Override
351    public void optimizeRepositoryIndex(String repositoryName) {
352        esa.optimizeRepositoryIndex(repositoryName);
353    }
354
355    @Override
356    public void optimizeIndex(String indexName) {
357        esa.optimizeIndex(indexName);
358    }
359
360    // ES Indexing =============================================================
361
362    @Override
363    public void indexNonRecursive(IndexingCommand cmd) {
364        List<IndexingCommand> cmds = new ArrayList<>(1);
365        cmds.add(cmd);
366        indexNonRecursive(cmds);
367    }
368
369    @Override
370    public void indexNonRecursive(List<IndexingCommand> cmds) {
371        if (!isReady()) {
372            stackCommands(cmds);
373            return;
374        }
375        if (log.isDebugEnabled()) {
376            log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray()));
377        }
378        esi.indexNonRecursive(cmds);
379    }
380
381    protected void stackCommands(List<IndexingCommand> cmds) {
382        if (log.isDebugEnabled()) {
383            log.debug("Delaying indexing commands: Waiting for Index to be initialized."
384                    + Arrays.toString(cmds.toArray()));
385        }
386        stackedCommands.addAll(cmds);
387    }
388
389    @Override
390    public void runIndexingWorker(List<IndexingCommand> cmds) {
391        if (!isReady()) {
392            stackCommands(cmds);
393            return;
394        }
395        runIndexingWorkerCount.incrementAndGet();
396        try {
397            dispatchWork(cmds);
398        } finally {
399            runIndexingWorkerCount.decrementAndGet();
400        }
401    }
402
403    /**
404     * Dispatch jobs between sync and async worker
405     */
406    protected void dispatchWork(List<IndexingCommand> cmds) {
407        Map<String, List<IndexingCommand>> syncCommands = new HashMap<>();
408        Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>();
409        for (IndexingCommand cmd : cmds) {
410            if (cmd.isSync()) {
411                List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName());
412                if (syncCmds == null) {
413                    syncCmds = new ArrayList<>();
414                }
415                syncCmds.add(cmd);
416                syncCommands.put(cmd.getRepositoryName(), syncCmds);
417            } else {
418                List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName());
419                if (asyncCmds == null) {
420                    asyncCmds = new ArrayList<>();
421                }
422                asyncCmds.add(cmd);
423                asyncCommands.put(cmd.getRepositoryName(), asyncCmds);
424            }
425        }
426        runIndexingSyncWorker(syncCommands);
427        scheduleIndexingAsyncWorker(asyncCommands);
428    }
429
430    protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) {
431        if (asyncCommands.isEmpty()) {
432            return;
433        }
434        WorkManager wm = Framework.getLocalService(WorkManager.class);
435        for (String repositoryName : asyncCommands.keySet()) {
436            IndexingWorker idxWork = new IndexingWorker(repositoryName,
437                    asyncCommands.get(repositoryName));
438            // we are in afterCompletion don't wait for a commit
439            wm.schedule(idxWork, false);
440        }
441    }
442
443    protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) {
444        if (syncCommands.isEmpty()) {
445            return;
446        }
447        Transaction transaction = TransactionHelper.suspendTransaction();
448        try {
449            for (String repositoryName : syncCommands.keySet()) {
450                IndexingWorker idxWork = new IndexingWorker(repositoryName,
451                        syncCommands.get(repositoryName));
452                idxWork.run();
453            }
454        } finally {
455            if (transaction != null) {
456                TransactionHelper.resumeTransaction(transaction);
457            }
458
459        }
460    }
461
462    @Override
463    public void runReindexingWorker(String repositoryName, String nxql) {
464        if (nxql == null || nxql.isEmpty()) {
465            throw new IllegalArgumentException("Expecting an NXQL query");
466        }
467        ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql);
468        WorkManager wm = Framework.getLocalService(WorkManager.class);
469        wm.schedule(worker);
470    }
471
472    // ES Search ===============================================================
473    @Override
474    public DocumentModelList query(NxQueryBuilder queryBuilder) {
475        return ess.query(queryBuilder);
476    }
477
478    @Override
479    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
480        return ess.queryAndAggregate(queryBuilder);
481    }
482
483    @Deprecated
484    @Override
485    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos)
486            {
487        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
488        return query(query);
489    }
490
491    @Deprecated
492    @Override
493    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
494            SortInfo... sortInfos) {
495        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort(
496                sortInfos);
497        return query(query);
498    }
499
500    // misc ====================================================================
501    private boolean isReady() {
502        return (esa != null) && esa.isReady();
503    }
504
505}