001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020package org.nuxeo.elasticsearch;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.ES_ENABLED_PROPERTY;
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.INDEXING_QUEUE_ID;
024import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_ON_STARTUP_PROPERTY;
025
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.Callable;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.Executors;
037import java.util.concurrent.ThreadFactory;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicInteger;
041
042import javax.transaction.Transaction;
043
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.elasticsearch.client.Client;
047import org.elasticsearch.index.query.QueryBuilder;
048import org.nuxeo.ecm.automation.jaxrs.io.documents.JsonESDocumentWriter;
049import org.nuxeo.ecm.core.api.CoreSession;
050import org.nuxeo.ecm.core.api.DocumentModelList;
051import org.nuxeo.ecm.core.api.SortInfo;
052import org.nuxeo.ecm.core.repository.RepositoryService;
053import org.nuxeo.ecm.core.work.api.Work;
054import org.nuxeo.ecm.core.work.api.WorkManager;
055import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
056import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
057import org.nuxeo.elasticsearch.api.ElasticSearchService;
058import org.nuxeo.elasticsearch.api.EsResult;
059import org.nuxeo.elasticsearch.commands.IndexingCommand;
060import org.nuxeo.elasticsearch.config.ElasticSearchDocWriterDescriptor;
061import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
062import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
063import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
064import org.nuxeo.elasticsearch.core.ElasticSearchAdminImpl;
065import org.nuxeo.elasticsearch.core.ElasticSearchIndexingImpl;
066import org.nuxeo.elasticsearch.core.ElasticSearchServiceImpl;
067import org.nuxeo.elasticsearch.query.NxQueryBuilder;
068import org.nuxeo.elasticsearch.work.IndexingWorker;
069import org.nuxeo.elasticsearch.work.ScrollingIndexingWorker;
070import org.nuxeo.runtime.api.Framework;
071import org.nuxeo.runtime.model.ComponentContext;
072import org.nuxeo.runtime.model.ComponentInstance;
073import org.nuxeo.runtime.model.DefaultComponent;
074import org.nuxeo.runtime.transaction.TransactionHelper;
075
076import com.google.common.util.concurrent.ListenableFuture;
077import com.google.common.util.concurrent.ListeningExecutorService;
078import com.google.common.util.concurrent.MoreExecutors;
079
080/**
081 * Component used to configure and manage ElasticSearch integration
082 */
083public class ElasticSearchComponent extends DefaultComponent implements ElasticSearchAdmin, ElasticSearchIndexing,
084        ElasticSearchService {
085
086    private static final Log log = LogFactory.getLog(ElasticSearchComponent.class);
087
088    private static final String EP_REMOTE = "elasticSearchRemote";
089
090    private static final String EP_LOCAL = "elasticSearchLocal";
091
092    private static final String EP_INDEX = "elasticSearchIndex";
093
094    private static final String EP_DOC_WRITER = "elasticSearchDocWriter";
095
096    private static final long REINDEX_TIMEOUT = 20;
097
098    // Indexing commands that where received before the index initialization
099    private final List<IndexingCommand> stackedCommands = Collections.synchronizedList(new ArrayList<>());
100
101    private final Map<String, ElasticSearchIndexConfig> indexConfig = new HashMap<>();
102
103    private ElasticSearchLocalConfig localConfig;
104
105    private ElasticSearchRemoteConfig remoteConfig;
106
107    private ElasticSearchAdminImpl esa;
108
109    private ElasticSearchIndexingImpl esi;
110
111    private ElasticSearchServiceImpl ess;
112
113    protected JsonESDocumentWriter jsonESDocumentWriter;
114
115    private ListeningExecutorService waiterExecutorService;
116
117    private final AtomicInteger runIndexingWorkerCount = new AtomicInteger(0);
118
119    // Nuxeo Component impl ======================================é=============
120    @Override
121    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
122        switch (extensionPoint) {
123        case EP_LOCAL:
124            ElasticSearchLocalConfig localContrib = (ElasticSearchLocalConfig) contribution;
125            if (localContrib.isEnabled()) {
126                localConfig = localContrib;
127                remoteConfig = null;
128                log.info("Registering local embedded configuration: " + localConfig + ", loaded from "
129                        + contributor.getName());
130            } else if (localConfig != null) {
131                log.info("Disabling previous local embedded configuration, deactivated by " + contributor.getName());
132                localConfig = null;
133            }
134            break;
135        case EP_REMOTE:
136            ElasticSearchRemoteConfig remoteContribution = (ElasticSearchRemoteConfig) contribution;
137            if (remoteContribution.isEnabled()) {
138                remoteConfig = remoteContribution;
139                localConfig = null;
140                log.info("Registering remote configuration: " + remoteConfig + ", loaded from " + contributor.getName());
141            } else if (remoteConfig != null) {
142                log.info("Disabling previous remote configuration, deactivated by " + contributor.getName());
143                remoteConfig = null;
144            }
145            break;
146        case EP_INDEX:
147            ElasticSearchIndexConfig idx = (ElasticSearchIndexConfig) contribution;
148            ElasticSearchIndexConfig previous = indexConfig.get(idx.getName());
149            if (idx.isEnabled()) {
150                idx.merge(previous);
151                indexConfig.put(idx.getName(), idx);
152                log.info("Registering index configuration: " + idx + ", loaded from " + contributor.getName());
153            } else if (previous != null) {
154                log.info("Disabling index configuration: " + previous + ", deactivated by " + contributor.getName());
155                indexConfig.remove(idx.getName());
156            }
157            break;
158        case EP_DOC_WRITER:
159            ElasticSearchDocWriterDescriptor writerDescriptor = (ElasticSearchDocWriterDescriptor) contribution;
160            try {
161                jsonESDocumentWriter = writerDescriptor.getKlass().newInstance();
162            } catch (IllegalAccessException | InstantiationException e) {
163                log.error("Can not instantiate jsonESDocumentWriter from " + writerDescriptor.getKlass());
164                throw new RuntimeException(e);
165            }
166            break;
167        default:
168            throw new IllegalStateException("Invalid EP: " + extensionPoint);
169        }
170    }
171
172    @Override
173    public void applicationStarted(ComponentContext context) {
174        if (!isElasticsearchEnabled()) {
175            log.info("Elasticsearch service is disabled");
176            return;
177        }
178        esa = new ElasticSearchAdminImpl(localConfig, remoteConfig, indexConfig);
179        esi = new ElasticSearchIndexingImpl(esa, jsonESDocumentWriter);
180        ess = new ElasticSearchServiceImpl(esa);
181        initListenerThreadPool();
182        processStackedCommands();
183        reindexOnStartup();
184    }
185
186    private void reindexOnStartup() {
187        boolean reindexOnStartup = Boolean.parseBoolean(Framework.getProperty(REINDEX_ON_STARTUP_PROPERTY, "false"));
188        if (!reindexOnStartup) {
189            return;
190        }
191        for (String repositoryName : esa.getInitializedRepositories()) {
192            log.warn(String.format("Indexing repository: %s on startup", repositoryName));
193            runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
194            try {
195                prepareWaitForIndexing().get(REINDEX_TIMEOUT, TimeUnit.SECONDS);
196            } catch (InterruptedException e) {
197                Thread.currentThread().interrupt();
198            } catch (ExecutionException e) {
199                log.error(e.getMessage(), e);
200            } catch (TimeoutException e) {
201                log.warn(String.format("Indexation of repository %s not finised after %d s, continuing in background",
202                        repositoryName, REINDEX_TIMEOUT));
203            }
204        }
205    }
206
207    protected boolean isElasticsearchEnabled() {
208        return Boolean.parseBoolean(Framework.getProperty(ES_ENABLED_PROPERTY, "true"));
209    }
210
211    @Override
212    public void deactivate(ComponentContext context) {
213        if (esa != null) {
214            esa.disconnect();
215        }
216    }
217
218    @Override
219    public int getApplicationStartedOrder() {
220        RepositoryService component = (RepositoryService) Framework.getRuntime().getComponent(
221                "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
222        return component.getApplicationStartedOrder() / 2;
223    }
224
225    void processStackedCommands() {
226        if (!stackedCommands.isEmpty()) {
227            log.info(String.format("Processing %d indexing commands stacked during startup", stackedCommands.size()));
228            runIndexingWorker(stackedCommands);
229            stackedCommands.clear();
230            log.debug("Done");
231        }
232    }
233
234    // Es Admin ================================================================
235
236    @Override
237    public Client getClient() {
238        return esa.getClient();
239    }
240
241    @Override
242    public void initIndexes(boolean dropIfExists) {
243        esa.initIndexes(dropIfExists);
244    }
245
246    @Override
247    public void dropAndInitIndex(String indexName) {
248        esa.dropAndInitIndex(indexName);
249    }
250
251    @Override
252    public void dropAndInitRepositoryIndex(String repositoryName) {
253        esa.dropAndInitRepositoryIndex(repositoryName);
254    }
255
256    @Override
257    public List<String> getRepositoryNames() {
258        return esa.getRepositoryNames();
259    }
260
261    @Override
262    public String getIndexNameForRepository(String repositoryName) {
263        return esa.getIndexNameForRepository(repositoryName);
264    }
265
266    @Override
267    public List<String> getIndexNamesForType(String type) {
268        return esa.getIndexNamesForType(type);
269    }
270
271    @Override
272    public String getIndexNameForType(String type) {
273        return esa.getIndexNameForType(type);
274    }
275
276    @Override
277    public int getPendingWorkerCount() {
278        WorkManager wm = Framework.getLocalService(WorkManager.class);
279        return  wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.SCHEDULED);
280    }
281
282    @Override
283    public int getRunningWorkerCount() {
284        WorkManager wm = Framework.getLocalService(WorkManager.class);
285        return runIndexingWorkerCount.get() + wm.getQueueSize(INDEXING_QUEUE_ID, Work.State.RUNNING);
286    }
287
288    @Override
289    public int getTotalCommandProcessed() {
290        return esa.getTotalCommandProcessed();
291    }
292
293    @Override
294    public boolean isEmbedded() {
295        return esa.isEmbedded();
296    }
297
298    @Override
299    public boolean isIndexingInProgress() {
300        return (runIndexingWorkerCount.get() > 0) || (getPendingWorkerCount() > 0) || (getRunningWorkerCount() >  0);
301    }
302
303    @Override
304    public ListenableFuture<Boolean> prepareWaitForIndexing() {
305        return waiterExecutorService.submit(new Callable<Boolean>() {
306            @Override
307            public Boolean call() throws Exception {
308                WorkManager wm = Framework.getLocalService(WorkManager.class);
309                boolean completed = false;
310                do {
311                    completed = wm.awaitCompletion(INDEXING_QUEUE_ID, 300, TimeUnit.SECONDS);
312                } while (! completed);
313                return true;
314            }
315        });
316    }
317
318    private static class NamedThreadFactory implements ThreadFactory {
319        @SuppressWarnings("NullableProblems")
320        @Override
321        public Thread newThread(Runnable r) {
322            return new Thread(r, "waitForEsIndexing");
323        }
324    }
325
326    protected void initListenerThreadPool() {
327        waiterExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory()));
328    }
329
330    @Override
331    public void refresh() {
332        esa.refresh();
333    }
334
335    @Override
336    public void refreshRepositoryIndex(String repositoryName) {
337        esa.refreshRepositoryIndex(repositoryName);
338    }
339
340    @Override
341    public void flush() {
342        esa.flush();
343    }
344
345    @Override
346    public void flushRepositoryIndex(String repositoryName) {
347        esa.flushRepositoryIndex(repositoryName);
348    }
349
350    @Override
351    public void optimize() {
352        esa.optimize();
353    }
354
355    @Override
356    public void optimizeRepositoryIndex(String repositoryName) {
357        esa.optimizeRepositoryIndex(repositoryName);
358    }
359
360    @Override
361    public void optimizeIndex(String indexName) {
362        esa.optimizeIndex(indexName);
363    }
364
365    // ES Indexing =============================================================
366
367    @Override
368    public void indexNonRecursive(IndexingCommand cmd) {
369        List<IndexingCommand> cmds = new ArrayList<>(1);
370        cmds.add(cmd);
371        indexNonRecursive(cmds);
372    }
373
374    @Override
375    public void indexNonRecursive(List<IndexingCommand> cmds) {
376        if (!isReady()) {
377            stackCommands(cmds);
378            return;
379        }
380        if (log.isDebugEnabled()) {
381            log.debug("Process indexing commands: " + Arrays.toString(cmds.toArray()));
382        }
383        esi.indexNonRecursive(cmds);
384    }
385
386    protected void stackCommands(List<IndexingCommand> cmds) {
387        if (log.isDebugEnabled()) {
388            log.debug("Delaying indexing commands: Waiting for Index to be initialized."
389                    + Arrays.toString(cmds.toArray()));
390        }
391        stackedCommands.addAll(cmds);
392    }
393
394    @Override
395    public void runIndexingWorker(List<IndexingCommand> cmds) {
396        if (!isReady()) {
397            stackCommands(cmds);
398            return;
399        }
400        runIndexingWorkerCount.incrementAndGet();
401        try {
402            dispatchWork(cmds);
403        } finally {
404            runIndexingWorkerCount.decrementAndGet();
405        }
406    }
407
408    /**
409     * Dispatch jobs between sync and async worker
410     */
411    protected void dispatchWork(List<IndexingCommand> cmds) {
412        Map<String, List<IndexingCommand>> syncCommands = new HashMap<>();
413        Map<String, List<IndexingCommand>> asyncCommands = new HashMap<>();
414        for (IndexingCommand cmd : cmds) {
415            if (cmd.isSync()) {
416                List<IndexingCommand> syncCmds = syncCommands.get(cmd.getRepositoryName());
417                if (syncCmds == null) {
418                    syncCmds = new ArrayList<>();
419                }
420                syncCmds.add(cmd);
421                syncCommands.put(cmd.getRepositoryName(), syncCmds);
422            } else {
423                List<IndexingCommand> asyncCmds = asyncCommands.get(cmd.getRepositoryName());
424                if (asyncCmds == null) {
425                    asyncCmds = new ArrayList<>();
426                }
427                asyncCmds.add(cmd);
428                asyncCommands.put(cmd.getRepositoryName(), asyncCmds);
429            }
430        }
431        runIndexingSyncWorker(syncCommands);
432        scheduleIndexingAsyncWorker(asyncCommands);
433    }
434
435    protected void scheduleIndexingAsyncWorker(Map<String, List<IndexingCommand>> asyncCommands) {
436        if (asyncCommands.isEmpty()) {
437            return;
438        }
439        WorkManager wm = Framework.getLocalService(WorkManager.class);
440        for (String repositoryName : asyncCommands.keySet()) {
441            IndexingWorker idxWork = new IndexingWorker(repositoryName,
442                    asyncCommands.get(repositoryName));
443            // we are in afterCompletion don't wait for a commit
444            wm.schedule(idxWork, false);
445        }
446    }
447
448    protected void runIndexingSyncWorker(Map<String, List<IndexingCommand>> syncCommands) {
449        if (syncCommands.isEmpty()) {
450            return;
451        }
452        Transaction transaction = TransactionHelper.suspendTransaction();
453        try {
454            for (String repositoryName : syncCommands.keySet()) {
455                IndexingWorker idxWork = new IndexingWorker(repositoryName,
456                        syncCommands.get(repositoryName));
457                idxWork.run();
458            }
459        } finally {
460            if (transaction != null) {
461                TransactionHelper.resumeTransaction(transaction);
462            }
463
464        }
465    }
466
467    @Override
468    public void runReindexingWorker(String repositoryName, String nxql) {
469        if (nxql == null || nxql.isEmpty()) {
470            throw new IllegalArgumentException("Expecting an NXQL query");
471        }
472        ScrollingIndexingWorker worker = new ScrollingIndexingWorker(repositoryName, nxql);
473        WorkManager wm = Framework.getLocalService(WorkManager.class);
474        wm.schedule(worker);
475    }
476
477    // ES Search ===============================================================
478    @Override
479    public DocumentModelList query(NxQueryBuilder queryBuilder) {
480        return ess.query(queryBuilder);
481    }
482
483    @Override
484    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
485        return ess.queryAndAggregate(queryBuilder);
486    }
487
488    @Deprecated
489    @Override
490    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos)
491            {
492        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
493        return query(query);
494    }
495
496    @Deprecated
497    @Override
498    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
499            SortInfo... sortInfos) {
500        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort(
501                sortInfos);
502        return query(query);
503    }
504
505    // misc ====================================================================
506    private boolean isReady() {
507        return (esa != null) && esa.isReady();
508    }
509
510}