001/*
002 * (C) Copyright 2014-2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.LinkedHashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.NoSuchElementException;
035import java.util.Set;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.elasticsearch.api.ESClient;
042import org.nuxeo.elasticsearch.api.ESClientFactory;
043import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
044import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
045import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig;
046import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
047import org.nuxeo.runtime.api.Framework;
048
049import com.google.common.util.concurrent.ListenableFuture;
050
051/**
052 * @since 6.0
053 */
054public class ElasticSearchAdminImpl implements ElasticSearchAdmin {
055    private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class);
056
057    protected static final int TIMEOUT_WAIT_FOR_CLUSTER_SECOND = 30;
058
059    protected static final int TIMEOUT_DELETE_SECOND = 300;
060
061    protected final AtomicInteger totalCommandProcessed = new AtomicInteger(0);
062
063    protected final Map<String, String> indexNames = new HashMap<>();
064
065    protected final Map<String, String> repoNames = new HashMap<>();
066
067    protected final Map<String, String> writeIndexNames = new HashMap<>();
068
069    protected final Map<String, ElasticSearchIndexConfig> indexConfig;
070
071    protected final ElasticSearchEmbeddedServerConfig embeddedServerConfig;
072
073    protected final ElasticSearchClientConfig clientConfig;
074
075    protected ElasticSearchEmbeddedNode embeddedServer;
076
077    protected ESClient client;
078
079    protected boolean indexInitDone;
080
081    protected String[] includeSourceFields;
082
083    protected String[] excludeSourceFields;
084
085    protected List<String> repositoryInitialized = new ArrayList<>();
086
087    /**
088     * Init the admin service, remote configuration if not null will take precedence over local embedded configuration.
089     * The transport client initialization can be customized.
090     *
091     * @since 9.1
092     */
093    public ElasticSearchAdminImpl(ElasticSearchEmbeddedServerConfig embeddedServerConfig,
094            ElasticSearchClientConfig clientConfig, Map<String, ElasticSearchIndexConfig> indexConfig) {
095        this.embeddedServerConfig = embeddedServerConfig;
096        this.indexConfig = indexConfig;
097        this.clientConfig = clientConfig;
098        checkConfig();
099        connect();
100        initializeIndexes();
101    }
102
103    protected void checkConfig() {
104        if (clientConfig == null) {
105            throw new IllegalStateException("No Elasticsearch Client configuration provided, aborting");
106        }
107    }
108
109    protected void connect() {
110        if (client != null) {
111            return;
112        }
113        if (embeddedServerConfig != null) {
114            embeddedServer = new ElasticSearchEmbeddedNode(embeddedServerConfig);
115            embeddedServer.start();
116        }
117        client = createClient(embeddedServer);
118        checkClusterHealth();
119        log.info("Elasticsearch Connected");
120    }
121
122    public void disconnect() {
123        if (client != null) {
124            try {
125                client.close();
126            } catch (Exception e) {
127                log.error("Failed to close client: " + e.getMessage(), e);
128            }
129            client = null;
130            indexInitDone = false;
131            log.info("Elasticsearch Disconnected");
132        }
133        if (embeddedServer != null) {
134            try {
135                embeddedServer.close();
136            } catch (IOException e) {
137                log.error("Failed to close embedded node: " + e.getMessage(), e);
138            }
139            embeddedServer = null;
140            log.info("Elasticsearch embedded Node Stopped");
141        }
142    }
143
144    protected ESClient createClient(ElasticSearchEmbeddedNode node) {
145        log.info("Connecting to Elasticsearch");
146        ESClient ret;
147        try {
148            ESClientFactory clientFactory = clientConfig.getKlass().newInstance();
149            ret = clientFactory.create(node, clientConfig);
150        } catch (ReflectiveOperationException e) {
151            log.error("Cannot instantiate Elasticsearch Client from class: " + clientConfig.getKlass());
152            throw new NuxeoException(e);
153        }
154        return ret;
155    }
156
157    protected void checkClusterHealth(String... indexNames) {
158        if (client == null) {
159            throw new IllegalStateException("No Elasticsearch Client available");
160        }
161        client.waitForYellowStatus(indexNames, TIMEOUT_WAIT_FOR_CLUSTER_SECOND);
162    }
163
164    protected void initializeIndexes() {
165        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
166            if (conf.isDocumentIndex()) {
167                log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName());
168                indexNames.put(conf.getRepositoryName(), conf.getName());
169                repoNames.put(conf.getName(), conf.getRepositoryName());
170                Set<String> set = new LinkedHashSet<>();
171                if (includeSourceFields != null) {
172                    set.addAll(Arrays.asList(includeSourceFields));
173                }
174                set.addAll(Arrays.asList(conf.getIncludes()));
175                if (set.contains(ALL_FIELDS)) {
176                    set.clear();
177                    set.add(ALL_FIELDS);
178                }
179                includeSourceFields = set.toArray(new String[set.size()]);
180                set.clear();
181                if (excludeSourceFields != null) {
182                    set.addAll(Arrays.asList(excludeSourceFields));
183                }
184                set.addAll(Arrays.asList(conf.getExcludes()));
185                excludeSourceFields = set.toArray(new String[set.size()]);
186            }
187
188        }
189        initIndexes(false);
190    }
191
192    // Admin Impl =============================================================
193    @Override
194    public void refreshRepositoryIndex(String repositoryName) {
195        if (log.isDebugEnabled()) {
196            log.debug("Refreshing index associated with repo: " + repositoryName);
197        }
198        getClient().refresh(getWriteIndexName(getIndexNameForRepository(repositoryName)));
199        if (log.isDebugEnabled()) {
200            log.debug("Refreshing index done");
201        }
202    }
203
204    @Override
205    public String getIndexNameForRepository(String repositoryName) {
206        String ret = indexNames.get(repositoryName);
207        if (ret == null) {
208            throw new NoSuchElementException("No index defined for repository: " + repositoryName);
209        }
210        return ret;
211    }
212
213    @Override
214    public String getRepositoryForIndex(String indexName) {
215        return repoNames.get(indexName);
216    }
217
218    @Override
219    public List<String> getIndexNamesForType(String type) {
220        List<String> indexNames = new ArrayList<>();
221        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
222            if (type.equals(conf.getType())) {
223                indexNames.add(conf.getName());
224            }
225        }
226        return indexNames;
227    }
228
229    @Override
230    public String getIndexNameForType(String type) {
231        List<String> indexNames = getIndexNamesForType(type);
232        if (indexNames.isEmpty()) {
233            throw new NoSuchElementException("No index defined for type: " + type);
234        }
235        return indexNames.get(0);
236    }
237
238    @Override
239    public String getWriteIndexName(String searchIndexName) {
240        return writeIndexNames.getOrDefault(searchIndexName, searchIndexName);
241    }
242
243    @Override
244    public void syncSearchAndWriteAlias(String searchIndexName) {
245        ElasticSearchIndexConfig conf = indexConfig.values()
246                                                   .stream()
247                                                   .filter(item -> item.getName().equals(searchIndexName))
248                                                   .findFirst()
249                                                   .orElseThrow(IllegalStateException::new);
250        syncSearchAndWriteAlias(conf);
251    }
252
253    @Override
254    public void flushRepositoryIndex(String repositoryName) {
255        log.warn("Flushing index associated with repo: " + repositoryName);
256        getClient().flush(getWriteIndexName(getIndexNameForRepository(repositoryName)));
257        log.info("Flushing index done");
258    }
259
260    @Override
261    public void refresh() {
262        for (String repositoryName : indexNames.keySet()) {
263            refreshRepositoryIndex(repositoryName);
264        }
265    }
266
267    @Override
268    public void flush() {
269        for (String repositoryName : indexNames.keySet()) {
270            flushRepositoryIndex(repositoryName);
271        }
272    }
273
274    @Override
275    public void optimizeIndex(String indexName) {
276        log.warn("Optimizing index: " + indexName);
277        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
278            if (conf.getName().equals(indexName)) {
279                getClient().optimize(indexName);
280            }
281        }
282        log.info("Optimize done");
283    }
284
285    @Override
286    public void optimizeRepositoryIndex(String repositoryName) {
287        optimizeIndex(getIndexNameForRepository(repositoryName));
288    }
289
290    @Override
291    public void optimize() {
292        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
293            optimizeIndex(conf.getName());
294        }
295    }
296
297    @Override
298    public ESClient getClient() {
299        return client;
300    }
301
302    @Override
303    public void initIndexes(boolean dropIfExists) {
304        indexInitDone = false;
305        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
306            initIndex(conf, dropIfExists);
307        }
308        log.info("Elasticsearch Service ready");
309        indexInitDone = true;
310    }
311
312    @Override
313    public void dropAndInitIndex(String indexName) {
314        log.info("Drop and init index: " + indexName);
315        indexInitDone = false;
316        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
317            if (conf.getName().equals(indexName)) {
318                initIndex(conf, true);
319            }
320        }
321        indexInitDone = true;
322    }
323
324    @Override
325    public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) {
326        log.info("Drop and init index of repository: " + repositoryName);
327        indexInitDone = false;
328        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
329            if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) {
330                initIndex(conf, true, syncAlias);
331            }
332        }
333        indexInitDone = true;
334    }
335
336    @Override
337    public List<String> getRepositoryNames() {
338        return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet()));
339    }
340
341    protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) {
342        initIndex(conf, dropIfExists, true);
343    }
344
345    protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists, boolean syncAlias) {
346        if (conf.manageAlias()) {
347            initWriteAlias(conf, dropIfExists);
348            initSearchAlias(conf);
349            writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias());
350            if (syncAlias) {
351                syncSearchAndWriteAlias(conf);
352            }
353        } else if (conf.hasExplicitWriteIndex()) {
354            initIndex(conf.writeIndexOrAlias(), conf, dropIfExists);
355            writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias());
356        } else {
357            initIndex(conf.getName(), conf, dropIfExists);
358            writeIndexNames.put(conf.getName(), conf.getName());
359        }
360    }
361
362    protected void initWriteAlias(ElasticSearchIndexConfig conf, boolean dropIfExists) {
363        // init the write index and alias
364        String writeAlias = conf.writeIndexOrAlias();
365        String writeIndex = getClient().getFirstIndexForAlias(writeAlias);
366        String nextWriteIndex = conf.newWriteIndexForAlias(conf.getName(), writeIndex);
367        if (writeIndex != null && !dropIfExists) {
368            // alias exists make sure the index is well configured
369            initIndex(writeIndex, conf, false);
370        } else {
371            // create a new write index and update the alias, we don't drop anything
372            if (getClient().indexExists(nextWriteIndex)) {
373                throw new IllegalStateException(
374                        String.format("New index name %s for the alias %s already exists", nextWriteIndex, writeAlias));
375            }
376            initIndex(nextWriteIndex, conf, false);
377            getClient().updateAlias(writeAlias, nextWriteIndex);
378        }
379    }
380
381    protected void initSearchAlias(ElasticSearchIndexConfig conf) {
382        // init the search alias
383        String searchAlias = conf.getName();
384        String searchIndex = getClient().getFirstIndexForAlias(searchAlias);
385        String writeAlias = conf.writeIndexOrAlias();
386        String writeIndex = getClient().getFirstIndexForAlias(writeAlias);
387        if (searchIndex == null) {
388            if (getClient().indexExists(searchAlias)) {
389                if (Framework.isTestModeSet()) {
390                    // in test mode we drop an index that have the target alias name
391                    getClient().deleteIndex(searchAlias, TIMEOUT_DELETE_SECOND);
392                }
393                searchIndex = searchAlias;
394            } else {
395                // search alias is not created, point to the write index
396                getClient().updateAlias(searchAlias, writeIndex);
397                searchIndex = writeIndex;
398            }
399        }
400        log.info(String.format("Managed index aliases: Alias: %s ->  index: %s, alias: %s ->  index: %s", searchAlias, searchIndex, writeAlias, writeIndex));
401    }
402
403    /**
404     * Update the search index to point to the write index.
405     */
406    protected void syncSearchAndWriteAlias(ElasticSearchIndexConfig conf) {
407        if (!conf.manageAlias()) {
408            return;
409        }
410        String searchAlias = conf.getName();
411        String searchIndex = getClient().getFirstIndexForAlias(searchAlias);
412        String writeAlias = conf.writeIndexOrAlias();
413        String writeIndex = getClient().getFirstIndexForAlias(writeAlias);
414        if (!writeIndex.equals(searchIndex)) {
415            log.warn(String.format("Updating search alias %s->%s (previously %s)", searchAlias, writeIndex, searchIndex));
416            getClient().updateAlias(searchAlias, writeIndex);
417            searchIndex = writeIndex;
418        }
419        if (searchIndex != null) {
420            repoNames.put(searchIndex, conf.getRepositoryName());
421        }
422    }
423
424    protected void initIndex(String indexName, ElasticSearchIndexConfig conf, boolean dropIfExists) {
425        if (!conf.mustCreate()) {
426            return;
427        }
428        log.info(String.format("Initialize index: %s with conf: %s, type: %s", indexName, conf.getName(),
429                conf.getType()));
430        boolean mappingExists = false;
431        boolean indexExists = getClient().indexExists(indexName);
432        if (indexExists) {
433            if (!dropIfExists) {
434                log.debug("Index " + indexName + " already exists");
435                mappingExists = getClient().mappingExists(indexName, conf.getType());
436                if (conf.isDocumentIndex()) {
437                    //Check if the index is actually an alias.
438                    String realIndexForAlias = getClient().getFirstIndexForAlias(conf.getName());
439                    if (realIndexForAlias != null) {
440                        repoNames.put(realIndexForAlias, conf.getRepositoryName());
441                    }
442                }
443            } else {
444                if (!Framework.isTestModeSet()) {
445                    log.warn(String.format(
446                            "Initializing index: %s, type: %s with " + "dropIfExists flag, deleting an existing index",
447                            indexName, conf.getType()));
448                }
449                getClient().deleteIndex(indexName, TIMEOUT_DELETE_SECOND);
450                indexExists = false;
451            }
452        }
453        if (!indexExists) {
454            log.info(String.format("Creating index: %s", indexName));
455            if (log.isDebugEnabled()) {
456                log.debug("Using settings: " + conf.getSettings());
457            }
458            getClient().createIndex(indexName, conf.getSettings());
459        }
460        if (!mappingExists) {
461            log.info(String.format("Creating mapping type: %s on index: %s", indexName, conf.getName()));
462            if (log.isDebugEnabled()) {
463                log.debug("Using mapping: " + conf.getMapping());
464            }
465            getClient().createMapping(indexName, conf.getType(), conf.getMapping());
466            if (!dropIfExists && conf.getRepositoryName() != null) {
467                repositoryInitialized.add(conf.getRepositoryName());
468            }
469        }
470        // make sure the index is ready before returning
471        checkClusterHealth(indexName);
472    }
473
474    @Override
475    public long getPendingWorkerCount() {
476        // impl of scheduling is left to the ESService
477        throw new UnsupportedOperationException("Not implemented");
478    }
479
480    @Override
481    public long getRunningWorkerCount() {
482        // impl of scheduling is left to the ESService
483        throw new UnsupportedOperationException("Not implemented");
484    }
485
486    @Override
487    public int getTotalCommandProcessed() {
488        return totalCommandProcessed.get();
489    }
490
491    @Override
492    public boolean isEmbedded() {
493        return embeddedServer != null;
494    }
495
496    @Override
497    public boolean useExternalVersion() {
498        return clientConfig.useExternalVersion();
499    }
500
501    @Override
502    public boolean isIndexingInProgress() {
503        // impl of scheduling is left to the ESService
504        throw new UnsupportedOperationException("Not implemented");
505    }
506
507    @Override
508    public ListenableFuture<Boolean> prepareWaitForIndexing() {
509        throw new UnsupportedOperationException("Not implemented");
510    }
511
512    /**
513     * Get the elastic search indexes for searches
514     */
515    protected String[] getSearchIndexes(List<String> searchRepositories) {
516        if (searchRepositories.isEmpty()) {
517            Collection<String> values = indexNames.values();
518            return values.toArray(new String[values.size()]);
519        }
520        String[] ret = new String[searchRepositories.size()];
521        int i = 0;
522        for (String repo : searchRepositories) {
523            ret[i++] = getIndexNameForRepository(repo);
524        }
525        return ret;
526    }
527
528    public boolean isReady() {
529        return indexInitDone;
530    }
531
532    protected String[] getIncludeSourceFields() {
533        return includeSourceFields;
534    }
535
536    protected String[] getExcludeSourceFields() {
537        return excludeSourceFields;
538    }
539
540    protected Map<String, String> getRepositoryMap() {
541        return repoNames;
542    }
543
544    /**
545     * Get the list of repository names that have their index created.
546     */
547    public List<String> getInitializedRepositories() {
548        return repositoryInitialized;
549    }
550}