001/*
002 * (C) Copyright 2014-2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.LinkedHashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.NoSuchElementException;
035import java.util.Set;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.elasticsearch.api.ESClient;
042import org.nuxeo.elasticsearch.api.ESClientFactory;
043import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
044import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
045import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig;
046import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
047import org.nuxeo.runtime.api.Framework;
048
049import com.google.common.util.concurrent.ListenableFuture;
050
051/**
052 * @since 6.0
053 */
054public class ElasticSearchAdminImpl implements ElasticSearchAdmin {
055    private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class);
056
057    protected static final int TIMEOUT_WAIT_FOR_CLUSTER_SECOND = 30;
058
059    protected static final int TIMEOUT_DELETE_SECOND = 300;
060
061    protected final AtomicInteger totalCommandProcessed = new AtomicInteger(0);
062
063    protected final Map<String, String> indexNames = new HashMap<>();
064
065    protected final Map<String, String> repoNames = new HashMap<>();
066
067    protected final Map<String, String> writeIndexNames = new HashMap<>();
068
069    protected final Map<String, ElasticSearchIndexConfig> indexConfig;
070
071    protected final ElasticSearchEmbeddedServerConfig embeddedServerConfig;
072
073    protected final ElasticSearchClientConfig clientConfig;
074
075    protected ElasticSearchEmbeddedNode embeddedServer;
076
077    protected ESClient client;
078
079    protected boolean indexInitDone;
080
081    protected String[] includeSourceFields;
082
083    protected String[] excludeSourceFields;
084
085    protected List<String> repositoryInitialized = new ArrayList<>();
086
087    /**
088     * Init the admin service, remote configuration if not null will take precedence over local embedded configuration.
089     * The transport client initialization can be customized.
090     *
091     * @since 9.1
092     */
093    public ElasticSearchAdminImpl(ElasticSearchEmbeddedServerConfig embeddedServerConfig,
094            ElasticSearchClientConfig clientConfig, Map<String, ElasticSearchIndexConfig> indexConfig) {
095        this.embeddedServerConfig = embeddedServerConfig;
096        this.indexConfig = indexConfig;
097        this.clientConfig = clientConfig;
098        checkConfig();
099        connect();
100        initializeIndexes();
101    }
102
103    protected void checkConfig() {
104        if (clientConfig == null) {
105            throw new IllegalStateException("No Elasticsearch Client configuration provided, aborting");
106        }
107    }
108
109    protected void connect() {
110        if (client != null) {
111            return;
112        }
113        if (embeddedServerConfig != null) {
114            embeddedServer = new ElasticSearchEmbeddedNode(embeddedServerConfig);
115            embeddedServer.start();
116        }
117        client = createClient(embeddedServer);
118        checkClusterHealth();
119        log.info("Elasticsearch Connected");
120    }
121
122    public void disconnect() {
123        if (client != null) {
124            try {
125                client.close();
126            } catch (Exception e) {
127                log.error("Failed to close client: " + e.getMessage(), e);
128            }
129            client = null;
130            indexInitDone = false;
131            log.info("Elasticsearch Disconnected");
132        }
133        if (embeddedServer != null) {
134            try {
135                embeddedServer.close();
136            } catch (IOException e) {
137                log.error("Failed to close embedded node: " + e.getMessage(), e);
138            }
139            embeddedServer = null;
140            log.info("Elasticsearch embedded Node Stopped");
141        }
142    }
143
144    protected ESClient createClient(ElasticSearchEmbeddedNode node) {
145        log.info("Connecting to Elasticsearch");
146        ESClient ret;
147        try {
148            ESClientFactory clientFactory = clientConfig.getKlass().newInstance();
149            ret = clientFactory.create(node, clientConfig);
150        } catch (ReflectiveOperationException e) {
151            log.error("Cannot instantiate Elasticsearch Client from class: " + clientConfig.getKlass());
152            throw new NuxeoException(e);
153        }
154        return ret;
155    }
156
157    protected void checkClusterHealth(String... indexNames) {
158        if (client == null) {
159            throw new IllegalStateException("No Elasticsearch Client available");
160        }
161        client.waitForYellowStatus(indexNames, TIMEOUT_WAIT_FOR_CLUSTER_SECOND);
162    }
163
164    protected void initializeIndexes() {
165        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
166            if (conf.isDocumentIndex()) {
167                log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName());
168                indexNames.put(conf.getRepositoryName(), conf.getName());
169                repoNames.put(conf.getName(), conf.getRepositoryName());
170                Set<String> set = new LinkedHashSet<>();
171                if (includeSourceFields != null) {
172                    set.addAll(Arrays.asList(includeSourceFields));
173                }
174                set.addAll(Arrays.asList(conf.getIncludes()));
175                if (set.contains(ALL_FIELDS)) {
176                    set.clear();
177                    set.add(ALL_FIELDS);
178                }
179                includeSourceFields = set.toArray(new String[set.size()]);
180                set.clear();
181                if (excludeSourceFields != null) {
182                    set.addAll(Arrays.asList(excludeSourceFields));
183                }
184                set.addAll(Arrays.asList(conf.getExcludes()));
185                excludeSourceFields = set.toArray(new String[set.size()]);
186            }
187
188        }
189        initIndexes(false);
190    }
191
192    // Admin Impl =============================================================
193    @Override
194    public void refreshRepositoryIndex(String repositoryName) {
195        if (log.isDebugEnabled()) {
196            log.debug("Refreshing index associated with repo: " + repositoryName);
197        }
198        getClient().refresh(getWriteIndexName(getIndexNameForRepository(repositoryName)));
199        if (log.isDebugEnabled()) {
200            log.debug("Refreshing index done");
201        }
202    }
203
204    @Override
205    public String getIndexNameForRepository(String repositoryName) {
206        String ret = indexNames.get(repositoryName);
207        if (ret == null) {
208            throw new NoSuchElementException("No index defined for repository: " + repositoryName);
209        }
210        return ret;
211    }
212
213    @Override
214    public String getRepositoryForIndex(String indexName) {
215        return repoNames.get(indexName);
216    }
217
218    @Override
219    public List<String> getIndexNamesForType(String type) {
220        List<String> indexNames = new ArrayList<>();
221        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
222            if (type.equals(conf.getType())) {
223                indexNames.add(conf.getName());
224            }
225        }
226        return indexNames;
227    }
228
229    @Override
230    public String getIndexNameForType(String type) {
231        List<String> indexNames = getIndexNamesForType(type);
232        if (indexNames.isEmpty()) {
233            throw new NoSuchElementException("No index defined for type: " + type);
234        }
235        return indexNames.get(0);
236    }
237
238    @Override
239    public String getWriteIndexName(String searchIndexName) {
240        return writeIndexNames.getOrDefault(searchIndexName, searchIndexName);
241    }
242
243    @Override
244    public void syncSearchAndWriteAlias(String searchIndexName) {
245        ElasticSearchIndexConfig conf = indexConfig.values()
246                                                   .stream()
247                                                   .filter(item -> item.getName().equals(searchIndexName))
248                                                   .findFirst()
249                                                   .orElseThrow(IllegalStateException::new);
250        syncSearchAndWriteAlias(conf);
251    }
252
253    @Override
254    public void flushRepositoryIndex(String repositoryName) {
255        log.warn("Flushing index associated with repo: " + repositoryName);
256        getClient().flush(getWriteIndexName(getIndexNameForRepository(repositoryName)));
257        log.info("Flushing index done");
258    }
259
260    @Override
261    public void refresh() {
262        for (String repositoryName : indexNames.keySet()) {
263            refreshRepositoryIndex(repositoryName);
264        }
265    }
266
267    @Override
268    public void flush() {
269        for (String repositoryName : indexNames.keySet()) {
270            flushRepositoryIndex(repositoryName);
271        }
272    }
273
274    @Override
275    public void optimizeIndex(String indexName) {
276        log.warn("Optimizing index: " + indexName);
277        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
278            if (conf.getName().equals(indexName)) {
279                getClient().optimize(indexName);
280            }
281        }
282        log.info("Optimize done");
283    }
284
285    @Override
286    public void optimizeRepositoryIndex(String repositoryName) {
287        optimizeIndex(getIndexNameForRepository(repositoryName));
288    }
289
290    @Override
291    public void optimize() {
292        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
293            optimizeIndex(conf.getName());
294        }
295    }
296
297    @Override
298    public ESClient getClient() {
299        return client;
300    }
301
302    @Override
303    public void initIndexes(boolean dropIfExists) {
304        indexInitDone = false;
305        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
306            initIndex(conf, dropIfExists);
307        }
308        log.info("Elasticsearch Service ready");
309        indexInitDone = true;
310    }
311
312    @Override
313    public void dropAndInitIndex(String indexName) {
314        log.info("Drop and init index: " + indexName);
315        indexInitDone = false;
316        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
317            if (conf.getName().equals(indexName)) {
318                initIndex(conf, true);
319            }
320        }
321        indexInitDone = true;
322    }
323
324    @Override
325    public void dropAndInitRepositoryIndex(String repositoryName, boolean syncAlias) {
326        log.info("Drop and init index of repository: " + repositoryName);
327        indexInitDone = false;
328        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
329            if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) {
330                initIndex(conf, true, syncAlias);
331            }
332        }
333        indexInitDone = true;
334    }
335
336    @Override
337    public List<String> getRepositoryNames() {
338        return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet()));
339    }
340
341    protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) {
342        initIndex(conf, dropIfExists, true);
343    }
344
345    protected void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists, boolean syncAlias) {
346        if (conf.manageAlias()) {
347            initWriteAlias(conf, dropIfExists);
348            initSearchAlias(conf);
349            writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias());
350            if (syncAlias) {
351                syncSearchAndWriteAlias(conf);
352            }
353        } else if (conf.hasExplicitWriteIndex()) {
354            initIndex(conf.writeIndexOrAlias(), conf, dropIfExists);
355            writeIndexNames.put(conf.getName(), conf.writeIndexOrAlias());
356        } else {
357            initIndex(conf.getName(), conf, dropIfExists);
358            writeIndexNames.put(conf.getName(), conf.getName());
359        }
360    }
361
362    protected void initWriteAlias(ElasticSearchIndexConfig conf, boolean dropIfExists) {
363        // init the write index and alias
364        String writeAlias = conf.writeIndexOrAlias();
365        String writeIndex = getClient().getFirstIndexForAlias(writeAlias);
366        String nextWriteIndex = conf.newWriteIndexForAlias(conf.getName(), writeIndex);
367        if (writeIndex != null && !dropIfExists) {
368            // alias exists make sure the index is well configured
369            initIndex(writeIndex, conf, false);
370        } else {
371            // create a new write index and update the alias, we don't drop anything
372            if (getClient().indexExists(nextWriteIndex)) {
373                throw new IllegalStateException(
374                        String.format("New index name %s for the alias %s already exists", nextWriteIndex, writeAlias));
375            }
376            initIndex(nextWriteIndex, conf, false);
377            getClient().updateAlias(writeAlias, nextWriteIndex);
378        }
379    }
380
381    protected void initSearchAlias(ElasticSearchIndexConfig conf) {
382        // init the search alias
383        String searchAlias = conf.getName();
384        String searchIndex = getClient().getFirstIndexForAlias(searchAlias);
385        String writeAlias = conf.writeIndexOrAlias();
386        String writeIndex = getClient().getFirstIndexForAlias(writeAlias);
387        if (searchIndex == null) {
388            if (getClient().indexExists(searchAlias)) {
389                if (Framework.isTestModeSet()) {
390                    // in test mode we drop an index that have the target alias name
391                    getClient().deleteIndex(searchAlias, TIMEOUT_DELETE_SECOND);
392                }
393                searchIndex = searchAlias;
394            } else {
395                // search alias is not created, point to the write index
396                getClient().updateAlias(searchAlias, writeIndex);
397                searchIndex = writeIndex;
398            }
399        }
400        log.info(String.format("Managed index aliases: Alias: %s ->  index: %s, alias: %s ->  index: %s", searchAlias,
401                searchIndex, writeAlias, writeIndex));
402    }
403
404    /**
405     * Update the search index to point to the write index.
406     */
407    protected void syncSearchAndWriteAlias(ElasticSearchIndexConfig conf) {
408        if (!conf.manageAlias()) {
409            return;
410        }
411        String searchAlias = conf.getName();
412        String searchIndex = getClient().getFirstIndexForAlias(searchAlias);
413        String writeAlias = conf.writeIndexOrAlias();
414        String writeIndex = getClient().getFirstIndexForAlias(writeAlias);
415        if (!writeIndex.equals(searchIndex)) {
416            log.warn(String.format("Updating search alias %s->%s (previously %s)", searchAlias, writeIndex,
417                    searchIndex));
418            getClient().updateAlias(searchAlias, writeIndex);
419            searchIndex = writeIndex;
420        }
421        if (searchIndex != null) {
422            repoNames.put(searchIndex, conf.getRepositoryName());
423        }
424    }
425
426    protected void initIndex(String indexName, ElasticSearchIndexConfig conf, boolean dropIfExists) {
427        if (!conf.mustCreate()) {
428            return;
429        }
430        log.info(String.format("Initialize index: %s with conf: %s, type: %s", indexName, conf.getName(),
431                conf.getType()));
432        boolean mappingExists = false;
433        boolean indexExists = getClient().indexExists(indexName);
434        if (indexExists) {
435            if (!dropIfExists) {
436                log.debug("Index " + indexName + " already exists");
437                mappingExists = getClient().mappingExists(indexName, conf.getType());
438                if (conf.isDocumentIndex()) {
439                    // Check if the index is actually an alias.
440                    String realIndexForAlias = getClient().getFirstIndexForAlias(conf.getName());
441                    if (realIndexForAlias != null) {
442                        repoNames.put(realIndexForAlias, conf.getRepositoryName());
443                    }
444                }
445            } else {
446                if (!Framework.isTestModeSet()) {
447                    log.warn(String.format(
448                            "Initializing index: %s, type: %s with " + "dropIfExists flag, deleting an existing index",
449                            indexName, conf.getType()));
450                }
451                getClient().deleteIndex(indexName, TIMEOUT_DELETE_SECOND);
452                indexExists = false;
453            }
454        }
455        if (!indexExists) {
456            log.info(String.format("Creating index: %s", indexName));
457            if (log.isDebugEnabled()) {
458                log.debug("Using settings: " + conf.getSettings());
459            }
460            getClient().createIndex(indexName, conf.getSettings());
461        }
462        if (!mappingExists) {
463            log.info(String.format("Creating mapping type: %s on index: %s", indexName, conf.getName()));
464            if (log.isDebugEnabled()) {
465                log.debug("Using mapping: " + conf.getMapping());
466            }
467            getClient().createMapping(indexName, conf.getType(), conf.getMapping());
468            if (!dropIfExists && conf.getRepositoryName() != null) {
469                repositoryInitialized.add(conf.getRepositoryName());
470            }
471        }
472        // make sure the index is ready before returning
473        checkClusterHealth(indexName);
474    }
475
476    @Override
477    public long getPendingWorkerCount() {
478        // impl of scheduling is left to the ESService
479        throw new UnsupportedOperationException("Not implemented");
480    }
481
482    @Override
483    public long getRunningWorkerCount() {
484        // impl of scheduling is left to the ESService
485        throw new UnsupportedOperationException("Not implemented");
486    }
487
488    @Override
489    public int getTotalCommandProcessed() {
490        return totalCommandProcessed.get();
491    }
492
493    @Override
494    public boolean isEmbedded() {
495        return embeddedServer != null;
496    }
497
498    @Override
499    public boolean useExternalVersion() {
500        return clientConfig.useExternalVersion();
501    }
502
503    @Override
504    public boolean isIndexingInProgress() {
505        // impl of scheduling is left to the ESService
506        throw new UnsupportedOperationException("Not implemented");
507    }
508
509    @Override
510    public ListenableFuture<Boolean> prepareWaitForIndexing() {
511        throw new UnsupportedOperationException("Not implemented");
512    }
513
514    /**
515     * Get the elastic search indexes for searches
516     */
517    protected String[] getSearchIndexes(List<String> searchRepositories) {
518        if (searchRepositories.isEmpty()) {
519            Collection<String> values = indexNames.values();
520            return values.toArray(new String[values.size()]);
521        }
522        String[] ret = new String[searchRepositories.size()];
523        int i = 0;
524        for (String repo : searchRepositories) {
525            ret[i++] = getIndexNameForRepository(repo);
526        }
527        return ret;
528    }
529
530    public boolean isReady() {
531        return indexInitDone;
532    }
533
534    protected String[] getIncludeSourceFields() {
535        return includeSourceFields;
536    }
537
538    protected String[] getExcludeSourceFields() {
539        return excludeSourceFields;
540    }
541
542    protected Map<String, String> getRepositoryMap() {
543        return repoNames;
544    }
545
546    /**
547     * Get the list of repository names that have their index created.
548     */
549    public List<String> getInitializedRepositories() {
550        return repositoryInitialized;
551    }
552}