001/*
002 * (C) Copyright 2014-2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collection;
029import java.util.HashMap;
030import java.util.LinkedHashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.NoSuchElementException;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.stream.Collectors;
038
039import org.apache.logging.log4j.LogManager;
040import org.apache.logging.log4j.Logger;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.elasticsearch.api.ESClient;
043import org.nuxeo.elasticsearch.api.ESClientFactory;
044import org.nuxeo.elasticsearch.api.ESHintQueryBuilder;
045import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
046import org.nuxeo.elasticsearch.config.ESHintQueryBuilderDescriptor;
047import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
048import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig;
049import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
050import org.nuxeo.runtime.api.Framework;
051
052import com.google.common.util.concurrent.ListenableFuture;
053
054/**
055 * @since 6.0
056 */
057public class ElasticSearchAdminImpl implements ElasticSearchAdmin {
058    private static final Logger log = LogManager.getLogger(ElasticSearchAdminImpl.class);
059
060    protected static final int TIMEOUT_WAIT_FOR_CLUSTER_SECOND = 30;
061
062    protected static final int TIMEOUT_DELETE_SECOND = 300;
063
064    protected final AtomicInteger totalCommandProcessed = new AtomicInteger(0);
065
066    protected final Map<String, String> indexNames = new HashMap<>();
067
068    protected final Map<String, String> repoNames = new HashMap<>();
069
070    protected final Map<String, String> writeIndexNames = new HashMap<>();
071
072    protected final Map<String, ElasticSearchIndexConfig> indexConfig;
073
074    protected Map<String, ESHintQueryBuilder> hints;
075
076    protected final ElasticSearchEmbeddedServerConfig embeddedServerConfig;
077
078    protected final ElasticSearchClientConfig clientConfig;
079
080    protected ElasticSearchEmbeddedNode embeddedServer;
081
082    protected ESClient client;
083
084    protected boolean indexInitDone;
085
086    protected String[] includeSourceFields;
087
088    protected String[] excludeSourceFields;
089
090    protected List<String> repositoryInitialized = new ArrayList<>();
091
092    /**
093     * Init the admin service, remote configuration if not null will take precedence over local embedded configuration.
094     * The transport client initialization can be customized.
095     *
096     * @since 9.1
097     */
098    public ElasticSearchAdminImpl(ElasticSearchEmbeddedServerConfig embeddedServerConfig,
099            ElasticSearchClientConfig clientConfig, Map<String, ElasticSearchIndexConfig> indexConfig,
100            Collection<ESHintQueryBuilderDescriptor> hintDescriptors) {
101        this.embeddedServerConfig = embeddedServerConfig;
102        this.indexConfig = indexConfig;
103        this.clientConfig = clientConfig;
104        this.hints = hintDescriptors.stream()
105                                    .collect(Collectors.toMap(ESHintQueryBuilderDescriptor::getName,
106                                            ESHintQueryBuilderDescriptor::newInstance));
107        checkConfig();
108        connect();
109        initializeIndexes();
110    }
111
112    protected void checkConfig() {
113        if (clientConfig == null) {
114            throw new IllegalStateException("No Elasticsearch Client configuration provided, aborting");
115        }
116    }
117
118    protected void connect() {
119        if (client != null) {
120            return;
121        }
122        if (embeddedServerConfig != null) {
123            embeddedServer = new ElasticSearchEmbeddedNode(embeddedServerConfig);
124            embeddedServer.start();
125        }
126        client = createClient(embeddedServer);
127        try {
128            checkClusterHealth();
129            log.info("Elasticsearch Connected");
130        } catch (Exception e) {
131            disconnect();
132            throw new IllegalStateException("Unable to check cluster health", e);
133        }
134    }
135
136    public void disconnect() {
137        if (client != null) {
138            try {
139                client.close();
140            } catch (Exception e) {
141                log.error("Failed to close client: " + e.getMessage(), e);
142            }
143            client = null;
144            indexInitDone = false;
145            log.info("Elasticsearch Disconnected");
146        }
147        if (embeddedServer != null) {
148            try {
149                embeddedServer.close();
150            } catch (IOException e) {
151                log.error("Failed to close embedded node: {}", e.getMessage(), e);
152            }
153            embeddedServer = null;
154            log.info("Elasticsearch embedded Node Stopped");
155        }
156    }
157
158    protected ESClient createClient(ElasticSearchEmbeddedNode node) {
159        log.info("Connecting to Elasticsearch");
160        ESClient ret;
161        try {
162            ESClientFactory clientFactory = clientConfig.getKlass().getDeclaredConstructor().newInstance();
163            ret = clientFactory.create(node, clientConfig);
164        } catch (ReflectiveOperationException e) {
165            log.error("Cannot instantiate Elasticsearch Client from class: {}", clientConfig::getKlass);
166            throw new NuxeoException(e);
167        }
168        return ret;
169    }
170
171    protected void checkClusterHealth(String... indexNames) {
172        if (client == null) {
173            throw new IllegalStateException("No Elasticsearch Client available");
174        }
175        client.waitForYellowStatus(indexNames, TIMEOUT_WAIT_FOR_CLUSTER_SECOND);
176    }
177
178    protected void initializeIndexes() {
179        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
180            if (conf.isDocumentIndex()) {
181                log.info("Associate index: {} with repository: {}", conf::getName, conf::getRepositoryName);
182                indexNames.put(conf.getRepositoryName(), conf.getName());
183                repoNames.put(conf.getName(), conf.getRepositoryName());
184                Set<String> set = new LinkedHashSet<>();
185                if (includeSourceFields != null) {
186                    set.addAll(Arrays.asList(includeSourceFields));
187                }
188                set.addAll(Arrays.asList(conf.getIncludes()));
189                if (set.contains(ALL_FIELDS)) {
190                    set.clear();
191                    set.add(ALL_FIELDS);
192                }
193                includeSourceFields = set.toArray(new String[0]);
194                set.clear();
195                if (excludeSourceFields != null) {
196                    set.addAll(Arrays.asList(excludeSourceFields));
197                }
198                set.addAll(Arrays.asList(conf.getExcludes()));
199                excludeSourceFields = set.toArray(new String[0]);
200            }
201
202        }
203        initIndexes(false);
204    }
205
206    // Admin Impl =============================================================
207    @Override
208    public void refreshRepositoryIndex(String repositoryName) {
209        log.debug("Refreshing index associated with repo: {}", repositoryName);
210        getClient().refresh(getWriteIndexName(getIndexNameForRepository(repositoryName)));
211        log.debug("Refreshing index done");
212    }
213
214    @Override
215    public String getIndexNameForRepository(String repositoryName) {
216        String ret = indexNames.get(repositoryName);
217        if (ret == null) {
218            throw new NoSuchElementException("No index defined for repository: " + repositoryName);
219        }
220        return ret;
221    }
222
223    @Override
224    public String getRepositoryForIndex(String indexName) {
225        return repoNames.get(indexName);
226    }
227
228    @Override
229    public List<String> getIndexNamesForType(String type) {
230        List<String> indexNames = new ArrayList<>();
231        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
232            if (type.equals(conf.getType())) {
233                indexNames.add(conf.getName());
234            }
235        }
236        return indexNames;
237    }
238
239    @Override
240    public String getIndexNameForType(String type) {
241        List<String> indexNames = getIndexNamesForType(type);
242        if (indexNames.isEmpty()) {
243            throw new NoSuchElementException("No index defined for type: " + type);
244        }
245        return indexNames.get(0);
246    }
247
248    @Override
249    public String getWriteIndexName(String searchIndexName) {
250        return writeIndexNames.getOrDefault(searchIndexName, searchIndexName);
251    }
252
253    @Override
254    public void syncSearchAndWriteAlias(String searchIndexName) {
255        ElasticSearchIndexConfig conf = indexConfig.values()
256                                                   .stream()
257                                                   .filter(item -> item.getName().equals(searchIndexName))
258                                                   .findFirst()
259                                                   .orElseThrow(IllegalStateException::new);
260        syncSearchAndWriteAlias(conf);
261    }
262
263    @Override
264    public void flushRepositoryIndex(String repositoryName) {
265        log.warn("Flushing index associated with repo: {}", repositoryName);
266        getClient().flush(getWriteIndexName(getIndexNameForRepository(repositoryName)));
267        log.info("Flushing index done");
268    }
269
270    @Override
271    public void refresh() {
272        for (String repositoryName : indexNames.keySet()) {
273            refreshRepositoryIndex(repositoryName);
274        }
275    }
276
277    @Override
278    public void flush() {
279        for (String repositoryName : indexNames.keySet()) {
280            flushRepositoryIndex(repositoryName);
281        }
282    }
283
284    @Override
285    public void optimizeIndex(String indexName) {
286        log.warn("Optimizing index: {}", indexName);
287        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
288            if (conf.getName().equals(indexName)) {
289                getClient().optimize(indexName);
290            }
291        }
292        log.info("Optimize done");
293    }
294
295    @Override
296    public void optimizeRepositoryIndex(String repositoryName) {
297        optimizeIndex(getIndexNameForRepository(repositoryName));
298    }
299
300    @Override
301    public void optimize() {
302        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
303            optimizeIndex(conf.getName());
304        }
305    }
306
307    @Override
308    public ESClient getClient() {
309        return client;
310    }
311
312    @Override
313    public void initIndexes(boolean dropIfExists) {
314        indexInitDone = false;
315        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
316            initIndex(conf, dropIfExists);
317        }
318        log.info("Elasticsearch Service ready");
319        indexInitDone = true;
320    }
321
322    @Override
323    public void dropAndInitIndex(String indexName) {
324        log.info("Drop and init index: {}", indexName);
325        indexInitDone = false;
326        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
327            if (conf.getName().equals(indexName)) {
328                initIndex(conf, true);
329            }
330        }
331        indexInitDone = true;
332    }
333
334    @Override
335    public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) {
336        log.info("Drop and init index of repository: {}", repositoryName);
337        indexInitDone = false;
338        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
339            if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) {
340                initIndex(conf, true, syncAlias);
341            }
342        }
343        indexInitDone = true;
344    }
345
346    @Override
347    public List<String> getRepositoryNames() {
348        return List.copyOf(indexNames.keySet());
349    }
350
351    protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) {
352        initIndex(conf, dropIfExists, true);
353    }
354
355    protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists, boolean syncAlias) {
356        if (conf.manageAlias()) {
357            initWriteAlias(conf, dropIfExists);
358            initSearchAlias(conf);
359            writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias());
360            if (syncAlias) {
361                syncSearchAndWriteAlias(conf);
362            }
363        } else if (conf.hasExplicitWriteIndex()) {
364            initIndex(conf.writeIndexOrAlias(), conf, dropIfExists);
365            writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias());
366        } else {
367            initIndex(conf.getName(), conf, dropIfExists);
368            writeIndexNames.put(conf.getName(), conf.getName());
369        }
370    }
371
372    protected void initWriteAlias(ElasticSearchIndexConfig conf, boolean dropIfExists) {
373        // init the write index and alias
374        String writeAlias = conf.writeIndexOrAlias();
375        String writeIndex = getClient().getFirstIndexForAlias(writeAlias);
376        String nextWriteIndex = conf.newWriteIndexForAlias(conf.getName(), writeIndex);
377        if (writeIndex != null && !dropIfExists) {
378            // alias exists make sure the index is well configured
379            initIndex(writeIndex, conf, false);
380        } else {
381            // create a new write index and update the alias, we don't drop anything
382            if (getClient().indexExists(nextWriteIndex)) {
383                throw new IllegalStateException(
384                        String.format("New index name %s for the alias %s already exists", nextWriteIndex, writeAlias));
385            }
386            initIndex(nextWriteIndex, conf, false);
387            getClient().updateAlias(writeAlias, nextWriteIndex);
388        }
389    }
390
391    protected void initSearchAlias(ElasticSearchIndexConfig conf) {
392        // init the search alias
393        String searchAlias = conf.getName();
394        String searchIndex = getClient().getFirstIndexForAlias(searchAlias);
395        String writeAlias = conf.writeIndexOrAlias();
396        String writeIndex = getClient().getFirstIndexForAlias(writeAlias);
397        if (searchIndex == null) {
398            if (getClient().indexExists(searchAlias)) {
399                if (Framework.isTestModeSet()) {
400                    // in test mode we drop an index that have the target alias name
401                    getClient().deleteIndex(searchAlias, TIMEOUT_DELETE_SECOND);
402                }
403                searchIndex = searchAlias;
404            } else {
405                // search alias is not created, point to the write index
406                getClient().updateAlias(searchAlias, writeIndex);
407                searchIndex = writeIndex;
408            }
409        }
410        log.info("Managed index aliases: Alias: {} ->  index: {}, alias: {} ->  index: {}", searchAlias, searchIndex,
411                writeAlias, writeIndex);
412    }
413
414    /**
415     * Update the search index to point to the write index.
416     */
417    protected void syncSearchAndWriteAlias(ElasticSearchIndexConfig conf) {
418        if (!conf.manageAlias()) {
419            return;
420        }
421        String searchAlias = conf.getName();
422        String searchIndex = getClient().getFirstIndexForAlias(searchAlias);
423        String writeAlias = conf.writeIndexOrAlias();
424        String writeIndex = getClient().getFirstIndexForAlias(writeAlias);
425        if (!writeIndex.equals(searchIndex)) {
426            log.warn("Updating search alias {} -> {} (previously {})", searchAlias, writeIndex, searchIndex);
427            getClient().updateAlias(searchAlias, writeIndex);
428            searchIndex = writeIndex;
429        }
430        repoNames.put(searchIndex, conf.getRepositoryName());
431    }
432
433    protected void initIndex(String indexName, ElasticSearchIndexConfig conf, boolean dropIfExists) {
434        if (!conf.mustCreate()) {
435            return;
436        }
437        log.info("Initialize index: {} with conf: {}, type: {}", indexName, conf.getName(), conf.getType());
438        boolean mappingExists = false;
439        boolean indexExists = getClient().indexExists(indexName);
440        if (indexExists) {
441            if (!dropIfExists) {
442                log.debug("Index: {} already exists", indexName);
443                mappingExists = getClient().mappingExists(indexName, conf.getType());
444                if (conf.isDocumentIndex()) {
445                    // Check if the index is actually an alias.
446                    String realIndexForAlias = getClient().getFirstIndexForAlias(conf.getName());
447                    if (realIndexForAlias != null) {
448                        repoNames.put(realIndexForAlias, conf.getRepositoryName());
449                    }
450                }
451            } else {
452                if (!Framework.isTestModeSet()) {
453                    log.warn("Initializing index: {}, type: {} with dropIfExists flag, deleting an existing index",
454                            indexName, conf.getType());
455                }
456                getClient().deleteIndex(indexName, TIMEOUT_DELETE_SECOND);
457                indexExists = false;
458            }
459        }
460        if (!indexExists) {
461            log.info("Creating index: {}", indexName);
462            log.debug("Using settings: {}", conf::getSettings);
463            getClient().createIndex(indexName, conf.getSettings());
464        }
465        if (!mappingExists) {
466            log.info("Creating mapping type: {} on index: {}", indexName, conf.getName());
467            log.debug("Using mapping: {}", conf::getMapping);
468            getClient().createMapping(indexName, conf.getType(), conf.getMapping());
469            if (!dropIfExists && conf.getRepositoryName() != null) {
470                repositoryInitialized.add(conf.getRepositoryName());
471            }
472        }
473        // make sure the index is ready before returning
474        checkClusterHealth(indexName);
475    }
476
477    @Override
478    public long getPendingWorkerCount() {
479        // impl of scheduling is left to the ESService
480        throw new UnsupportedOperationException("Not implemented");
481    }
482
483    @Override
484    public long getRunningWorkerCount() {
485        // impl of scheduling is left to the ESService
486        throw new UnsupportedOperationException("Not implemented");
487    }
488
489    @Override
490    public int getTotalCommandProcessed() {
491        return totalCommandProcessed.get();
492    }
493
494    @Override
495    public boolean isEmbedded() {
496        return embeddedServer != null;
497    }
498
499    @Override
500    public boolean useExternalVersion() {
501        return clientConfig.useExternalVersion();
502    }
503
504    @Override
505    public boolean isIndexingInProgress() {
506        // impl of scheduling is left to the ESService
507        throw new UnsupportedOperationException("Not implemented");
508    }
509
510    @Override
511    public ListenableFuture<Boolean> prepareWaitForIndexing() {
512        throw new UnsupportedOperationException("Not implemented");
513    }
514
515    /**
516     * Get the elastic search indexes for searches
517     */
518    protected String[] getSearchIndexes(List<String> searchRepositories) {
519        if (searchRepositories.isEmpty()) {
520            Collection<String> values = indexNames.values();
521            return values.toArray(new String[0]);
522        }
523        String[] ret = new String[searchRepositories.size()];
524        int i = 0;
525        for (String repo : searchRepositories) {
526            ret[i++] = getIndexNameForRepository(repo);
527        }
528        return ret;
529    }
530
531    public boolean isReady() {
532        return indexInitDone;
533    }
534
535    protected String[] getIncludeSourceFields() {
536        return includeSourceFields;
537    }
538
539    protected String[] getExcludeSourceFields() {
540        return excludeSourceFields;
541    }
542
543    protected Map<String, String> getRepositoryMap() {
544        return repoNames;
545    }
546
547    /**
548     * Get the list of repository names that have their index created.
549     */
550    public List<String> getInitializedRepositories() {
551        return repositoryInitialized;
552    }
553
554    @Override
555    public Optional<ESHintQueryBuilder> getHintByOperator(String name) {
556        return Optional.ofNullable(hints.get(name));
557    }
558}