001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS;
024
025import java.net.InetAddress;
026import java.net.UnknownHostException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.LinkedHashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.NoSuchElementException;
036import java.util.Set;
037import java.util.concurrent.atomic.AtomicInteger;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
042import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
043import org.elasticsearch.client.Client;
044import org.elasticsearch.client.transport.NoNodeAvailableException;
045import org.elasticsearch.client.transport.TransportClient;
046import org.elasticsearch.cluster.health.ClusterHealthStatus;
047import org.elasticsearch.common.settings.Settings;
048import org.elasticsearch.common.settings.Settings.Builder;
049import org.elasticsearch.common.transport.InetSocketTransportAddress;
050import org.elasticsearch.node.Node;
051import org.elasticsearch.node.NodeBuilder;
052import org.nuxeo.elasticsearch.api.ESClientInitializationService;
053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
054import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
055import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
056import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
057import org.nuxeo.runtime.api.Framework;
058
059import com.google.common.util.concurrent.ListenableFuture;
060
061/**
062 * @since 6.0
063 */
064public class ElasticSearchAdminImpl implements ElasticSearchAdmin {
065    private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class);
066
067    private static final String TIMEOUT_WAIT_FOR_CLUSTER = "30s";
068
069    final AtomicInteger totalCommandProcessed = new AtomicInteger(0);
070
071    private final Map<String, String> indexNames = new HashMap<>();
072
073    private final Map<String, String> repoNames = new HashMap<>();
074
075    private final Map<String, ElasticSearchIndexConfig> indexConfig;
076
077    private Node localNode;
078
079    private Client client;
080
081    private boolean indexInitDone = false;
082
083    private final ElasticSearchLocalConfig localConfig;
084
085    private final ElasticSearchRemoteConfig remoteConfig;
086
087    private final ESClientInitializationService clientInitService;
088
089    private String[] includeSourceFields;
090
091    private String[] excludeSourceFields;
092
093    private boolean embedded = true;
094
095    private List<String> repositoryInitialized = new ArrayList<>();
096
097    /**
098     * Init the admin service, remote configuration if not null will take precedence over local embedded configuration.
099     * 
100     * @deprecated since 9.1, use {@link #ElasticSearchAdminImpl(ElasticSearchLocalConfig, ElasticSearchRemoteConfig,
101     *             Map<String, ElasticSearchIndexConfig>, ESClientInitializationService)} instead.
102     */
103    @Deprecated
104    public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig,
105            Map<String, ElasticSearchIndexConfig> indexConfig) {
106        this(localConfig, remoteConfig, indexConfig, null);
107    }
108
109    /**
110     * Init the admin service, remote configuration if not null will take precedence over local embedded configuration.
111     * The transport client initialization can be customized.
112     * 
113     * @since 9.1
114     */
115    public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig,
116            Map<String, ElasticSearchIndexConfig> indexConfig, ESClientInitializationService clientInitService) {
117        this.remoteConfig = remoteConfig;
118        this.localConfig = localConfig;
119        this.indexConfig = indexConfig;
120        this.clientInitService = clientInitService;
121        connect();
122        initializeIndexes();
123    }
124
125    private void connect() {
126        if (client != null) {
127            return;
128        }
129        if (remoteConfig != null) {
130            client = connectToRemote(remoteConfig);
131            embedded = false;
132        } else {
133            localNode = createEmbeddedNode(localConfig);
134            client = connectToEmbedded();
135            embedded = true;
136        }
137        checkClusterHealth();
138        log.info("ES Connected");
139    }
140
141    public void disconnect() {
142        if (client != null) {
143            client.close();
144            client = null;
145            indexInitDone = false;
146            log.info("ES Disconnected");
147        }
148        if (localNode != null) {
149            localNode.close();
150            localNode = null;
151            log.info("ES embedded Node Stopped");
152        }
153    }
154
155    private Node createEmbeddedNode(ElasticSearchLocalConfig conf) {
156        log.info("ES embedded Node Initializing (local in JVM)");
157        if (conf == null) {
158            throw new IllegalStateException("No embedded configuration defined");
159        }
160        if (!Framework.isTestModeSet()) {
161            log.warn("Elasticsearch embedded configuration is ONLY for testing"
162                    + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production.");
163        }
164        Builder sBuilder = Settings.settingsBuilder();
165        sBuilder.put("http.enabled", conf.httpEnabled())
166                .put("network.host", conf.getNetworkHost())
167                .put("path.home", conf.getHomePath())
168                .put("path.data", conf.getDataPath())
169                .put("index.number_of_shards", 1)
170                .put("index.number_of_replicas", 0)
171                .put("cluster.name", conf.getClusterName())
172                .put("node.name", conf.getNodeName())
173                .put("http.netty.worker_count", 4)
174                .put("http.cors.enabled", true)
175                .put("http.cors.allow-origin", "*")
176                .put("http.cors.allow-credentials", true)
177                .put("http.cors.allow-headers", "Authorization, X-Requested-With, Content-Type, Content-Length")
178                .put("cluster.routing.allocation.disk.threshold_enabled", false)
179                .put("http.port", conf.getHttpPort());
180        if (conf.getIndexStorageType() != null) {
181            sBuilder.put("index.store.type", conf.getIndexStorageType());
182        }
183        Settings settings = sBuilder.build();
184        log.debug("Using settings: " + settings.toDelimitedString(','));
185        Node ret = NodeBuilder.nodeBuilder().local(true).settings(settings).node();
186        assert ret != null : "Can not create an embedded ES Node";
187        return ret;
188    }
189
190    private Client connectToEmbedded() {
191        log.info("Connecting to embedded ES");
192        Client ret = localNode.start().client();
193        assert ret != null : "Can not connect to embedded ES Node";
194        return ret;
195    }
196
197    private Client connectToRemote(ElasticSearchRemoteConfig config) {
198        log.info("Connecting to remote ES cluster: " + config);
199
200        Settings settings = clientInitService.initializeSettings(config);
201        if (log.isDebugEnabled()) {
202            log.debug("Using settings: " + settings.toDelimitedString(','));
203        }
204
205        TransportClient ret = clientInitService.initializeClient(settings);
206
207        String[] addresses = config.getAddresses();
208        if (addresses == null) {
209            log.error("You need to provide an addressList to join a cluster");
210        } else {
211            for (String item : config.getAddresses()) {
212                String[] address = item.split(":");
213                log.debug("Add transport address: " + item);
214                try {
215                    InetAddress inet = InetAddress.getByName(address[0]);
216                    ret.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1])));
217                } catch (UnknownHostException e) {
218                    log.error("Unable to resolve host " + address[0], e);
219                }
220            }
221        }
222        assert ret != null : "Unable to create a remote client";
223        return ret;
224    }
225
226    private void checkClusterHealth(String... indexNames) {
227        if (client == null) {
228            throw new IllegalStateException("No es client available");
229        }
230        String errorMessage = null;
231        try {
232            log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames));
233            ClusterHealthResponse ret = client.admin()
234                                              .cluster()
235                                              .prepareHealth(indexNames)
236                                              .setTimeout(TIMEOUT_WAIT_FOR_CLUSTER)
237                                              .setWaitForYellowStatus()
238                                              .get();
239            if (ret.isTimedOut()) {
240                errorMessage = "ES Cluster health status not Yellow after " + TIMEOUT_WAIT_FOR_CLUSTER + " give up: "
241                        + ret;
242            } else {
243                if ((indexNames.length > 0) && ret.getStatus() != ClusterHealthStatus.GREEN) {
244                    log.warn("Es Cluster ready but not GREEN: " + ret);
245                } else {
246                    log.info("ES Cluster ready: " + ret);
247                }
248            }
249        } catch (NoNodeAvailableException e) {
250            errorMessage = "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage();
251        }
252        if (errorMessage != null) {
253            log.error(errorMessage);
254            throw new RuntimeException(errorMessage);
255        }
256    }
257
258    private void initializeIndexes() {
259        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
260            if (conf.isDocumentIndex()) {
261                log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName());
262                indexNames.put(conf.getRepositoryName(), conf.getName());
263                repoNames.put(conf.getName(), conf.getRepositoryName());
264                Set<String> set = new LinkedHashSet<>();
265                if (includeSourceFields != null) {
266                    set.addAll(Arrays.asList(includeSourceFields));
267                }
268                set.addAll(Arrays.asList(conf.getIncludes()));
269                if (set.contains(ALL_FIELDS)) {
270                    set.clear();
271                    set.add(ALL_FIELDS);
272                }
273                includeSourceFields = set.toArray(new String[set.size()]);
274                set.clear();
275                if (excludeSourceFields != null) {
276                    set.addAll(Arrays.asList(excludeSourceFields));
277                }
278                set.addAll(Arrays.asList(conf.getExcludes()));
279                excludeSourceFields = set.toArray(new String[set.size()]);
280            }
281
282        }
283        initIndexes(false);
284    }
285
286    // Admin Impl =============================================================
287    @Override
288    public void refreshRepositoryIndex(String repositoryName) {
289        if (log.isDebugEnabled()) {
290            log.debug("Refreshing index associated with repo: " + repositoryName);
291        }
292        getClient().admin().indices().prepareRefresh(getIndexNameForRepository(repositoryName)).execute().actionGet();
293        if (log.isDebugEnabled()) {
294            log.debug("Refreshing index done");
295        }
296    }
297
298    @Override
299    public String getIndexNameForRepository(String repositoryName) {
300        String ret = indexNames.get(repositoryName);
301        if (ret == null) {
302            throw new NoSuchElementException("No index defined for repository: " + repositoryName);
303        }
304        return ret;
305    }
306
307    @Override
308    public List<String> getIndexNamesForType(String type) {
309        List<String> indexNames = new ArrayList<>();
310        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
311            if (type.equals(conf.getType())) {
312                indexNames.add(conf.getName());
313            }
314        }
315        return indexNames;
316    }
317
318    @Override
319    public String getIndexNameForType(String type) {
320        List<String> indexNames = getIndexNamesForType(type);
321        if (indexNames.isEmpty()) {
322            throw new NoSuchElementException("No index defined for type: " + type);
323        }
324        return indexNames.get(0);
325    }
326
327    @Override
328    public void flushRepositoryIndex(String repositoryName) {
329        log.warn("Flushing index associated with repo: " + repositoryName);
330        getClient().admin().indices().prepareFlush(getIndexNameForRepository(repositoryName)).execute().actionGet();
331        log.info("Flushing index done");
332    }
333
334    @Override
335    public void refresh() {
336        for (String repositoryName : indexNames.keySet()) {
337            refreshRepositoryIndex(repositoryName);
338        }
339    }
340
341    @Override
342    public void flush() {
343        for (String repositoryName : indexNames.keySet()) {
344            flushRepositoryIndex(repositoryName);
345        }
346    }
347
348    @Override
349    public void optimizeIndex(String indexName) {
350        log.warn("Optimizing index: " + indexName);
351        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
352            if (conf.getName().equals(indexName)) {
353                getClient().admin().indices().prepareForceMerge(indexName).get();
354            }
355        }
356        log.info("Optimize done");
357    }
358
359    @Override
360    public void optimizeRepositoryIndex(String repositoryName) {
361        optimizeIndex(getIndexNameForRepository(repositoryName));
362    }
363
364    @Override
365    public void optimize() {
366        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
367            optimizeIndex(conf.getName());
368        }
369    }
370
371    @Override
372    public Client getClient() {
373        return client;
374    }
375
376    @Override
377    public void initIndexes(boolean dropIfExists) {
378        indexInitDone = false;
379        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
380            initIndex(conf, dropIfExists);
381        }
382        log.info("ES Service ready");
383        indexInitDone = true;
384    }
385
386    @Override
387    public void dropAndInitIndex(String indexName) {
388        log.info("Drop and init index: " + indexName);
389        indexInitDone = false;
390        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
391            if (conf.getName().equals(indexName)) {
392                initIndex(conf, true);
393            }
394        }
395        indexInitDone = true;
396    }
397
398    @Override
399    public void dropAndInitRepositoryIndex(String repositoryName) {
400        log.info("Drop and init index of repository: " + repositoryName);
401        indexInitDone = false;
402        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
403            if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) {
404                initIndex(conf, true);
405            }
406        }
407        indexInitDone = true;
408    }
409
410    @Override
411    public List<String> getRepositoryNames() {
412        return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet()));
413    }
414
415    void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) {
416        if (!conf.mustCreate()) {
417            return;
418        }
419        log.info(String.format("Initialize index: %s, type: %s", conf.getName(), conf.getType()));
420        boolean mappingExists = false;
421        boolean indexExists = getClient().admin()
422                                         .indices()
423                                         .prepareExists(conf.getName())
424                                         .execute()
425                                         .actionGet()
426                                         .isExists();
427        if (indexExists) {
428            if (!dropIfExists) {
429                log.debug("Index " + conf.getName() + " already exists");
430                mappingExists = getClient().admin()
431                                           .indices()
432                                           .prepareGetMappings(conf.getName())
433                                           .execute()
434                                           .actionGet()
435                                           .getMappings()
436                                           .get(conf.getName())
437                                           .containsKey(conf.getType());
438            } else {
439                if (!Framework.isTestModeSet()) {
440                    log.warn(String.format(
441                            "Initializing index: %s, type: %s with " + "dropIfExists flag, deleting an existing index",
442                            conf.getName(), conf.getType()));
443                }
444                getClient().admin().indices().delete(new DeleteIndexRequest(conf.getName())).actionGet();
445                indexExists = false;
446            }
447        }
448        if (!indexExists) {
449            log.info(String.format("Creating index: %s", conf.getName()));
450            if (log.isDebugEnabled()) {
451                log.debug("Using settings: " + conf.getSettings());
452            }
453            getClient().admin()
454                       .indices()
455                       .prepareCreate(conf.getName())
456                       .setSettings(conf.getSettings())
457                       .execute()
458                       .actionGet();
459        }
460        if (!mappingExists) {
461            log.info(String.format("Creating mapping type: %s on index: %s", conf.getType(), conf.getName()));
462            if (log.isDebugEnabled()) {
463                log.debug("Using mapping: " + conf.getMapping());
464            }
465            getClient().admin()
466                       .indices()
467                       .preparePutMapping(conf.getName())
468                       .setType(conf.getType())
469                       .setSource(conf.getMapping())
470                       .execute()
471                       .actionGet();
472            if (!dropIfExists && conf.getRepositoryName() != null) {
473                repositoryInitialized.add(conf.getRepositoryName());
474            }
475        }
476        // make sure the index is ready before returning
477        checkClusterHealth(conf.getName());
478    }
479
480    @Override
481    public long getPendingWorkerCount() {
482        // impl of scheduling is left to the ESService
483        throw new UnsupportedOperationException("Not implemented");
484    }
485
486    @Override
487    public long getRunningWorkerCount() {
488        // impl of scheduling is left to the ESService
489        throw new UnsupportedOperationException("Not implemented");
490    }
491
492    @Override
493    public int getTotalCommandProcessed() {
494        return totalCommandProcessed.get();
495    }
496
497    @Override
498    public boolean isEmbedded() {
499        return embedded;
500    }
501
502    @Override
503    public boolean useExternalVersion() {
504        if (isEmbedded()) {
505            return localConfig.useExternalVersion();
506        }
507        return remoteConfig.useExternalVersion();
508    }
509
510    @Override
511    public boolean isIndexingInProgress() {
512        // impl of scheduling is left to the ESService
513        throw new UnsupportedOperationException("Not implemented");
514    }
515
516    @Override
517    public ListenableFuture<Boolean> prepareWaitForIndexing() {
518        throw new UnsupportedOperationException("Not implemented");
519    }
520
521    /**
522     * Get the elastic search indexes for searches
523     */
524    String[] getSearchIndexes(List<String> searchRepositories) {
525        if (searchRepositories.isEmpty()) {
526            Collection<String> values = indexNames.values();
527            return values.toArray(new String[values.size()]);
528        }
529        String[] ret = new String[searchRepositories.size()];
530        int i = 0;
531        for (String repo : searchRepositories) {
532            ret[i++] = getIndexNameForRepository(repo);
533        }
534        return ret;
535    }
536
537    public boolean isReady() {
538        return indexInitDone;
539    }
540
541    String[] getIncludeSourceFields() {
542        return includeSourceFields;
543    }
544
545    String[] getExcludeSourceFields() {
546        return excludeSourceFields;
547    }
548
549    Map<String, String> getRepositoryMap() {
550        return repoNames;
551    }
552
553    /**
554     * Get the list of repository names that have their index created.
555     */
556    public List<String> getInitializedRepositories() {
557        return repositoryInitialized;
558    }
559}