001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.file.Files;
024import java.nio.file.Path;
025import java.nio.file.Paths;
026import java.security.GeneralSecurityException;
027import java.security.KeyStore;
028
029import javax.net.ssl.SSLContext;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.apache.http.HttpHost;
035import org.apache.http.auth.AuthScope;
036import org.apache.http.auth.UsernamePasswordCredentials;
037import org.apache.http.impl.client.BasicCredentialsProvider;
038import org.apache.http.ssl.SSLContextBuilder;
039import org.apache.http.ssl.SSLContexts;
040import org.elasticsearch.client.RestClient;
041import org.elasticsearch.client.RestClientBuilder;
042import org.elasticsearch.client.RestHighLevelClient;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.elasticsearch.api.ESClient;
045import org.nuxeo.elasticsearch.api.ESClientFactory;
046import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
047import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig;
048import org.nuxeo.elasticsearch.core.ElasticSearchEmbeddedNode;
049
050/**
051 * @since 9.3
052 */
053public class ESRestClientFactory implements ESClientFactory {
054    private static final Log log = LogFactory.getLog(ESRestClientFactory.class);
055
056    public static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
057
058    public static final String DEFAULT_SOCKET_TIMEOUT_MS = "20000";
059
060    public static final String CONNECTION_TIMEOUT_MS_OPT = "connection.timeout.ms";
061
062    public static final String SOCKET_TIMEOUT_MS_OPT = "socket.timeout.ms";
063
064    public static final String AUTH_USER_OPT = "username";
065
066    public static final String AUTH_PASSWORD_OPT = "password";
067
068    public static final String KEYSTORE_PATH_OPT = "keystore.path";
069
070    public static final String KEYSTORE_PASSWORD_OPT = "keystore.password";
071
072    /**
073     * @since 9.10-HF01
074     */
075    public static final String KEYSTORE_TYPE_OPT = "keystore.type";
076
077    @Override
078    public ESClient create(ElasticSearchEmbeddedNode node, ElasticSearchClientConfig config) {
079        if (node != null) {
080            return createLocalRestClient(node.getConfig());
081        }
082        return createRestClient(config);
083    }
084
085    protected ESClient createLocalRestClient(ElasticSearchEmbeddedServerConfig serverConfig) {
086        if (!serverConfig.httpEnabled()) {
087            throw new IllegalArgumentException(
088                    "Embedded configuration has no HTTP port enable, use TransportClient instead of Rest");
089        }
090        RestClientBuilder lowLevelRestClientBuilder = RestClient.builder(
091                new HttpHost("localhost", Integer.parseInt(serverConfig.getHttpPort())));
092        RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClientBuilder);
093        // checkConnection(client);
094        return new ESRestClient(client.getLowLevelClient(), client);
095    }
096
097    protected ESClient createRestClient(ElasticSearchClientConfig config) {
098        String addressList = config.getOption("addressList", "");
099        if (addressList.isEmpty()) {
100            throw new IllegalArgumentException("No addressList option provided cannot connect RestClient");
101        }
102        String[] hosts = addressList.split(",");
103        HttpHost[] httpHosts = new HttpHost[hosts.length];
104        int i = 0;
105        for (String host : hosts) {
106            httpHosts[i++] = HttpHost.create(host);
107        }
108        RestClientBuilder builder = RestClient.builder(httpHosts)
109                                              .setRequestConfigCallback(
110                                                      requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(
111                                                              getConnectTimeoutMs(config)).setSocketTimeout(
112                                                                      getSocketTimeoutMs(config)))
113                                              .setMaxRetryTimeoutMillis(getConnectTimeoutMs(config));
114        if (StringUtils.isNotBlank(config.getOption(AUTH_USER_OPT))
115                || StringUtils.isNotBlank(config.getOption(KEYSTORE_PATH_OPT))) {
116            addClientCallback(config, builder);
117        }
118        RestHighLevelClient client = new RestHighLevelClient(builder);
119        // checkConnection(client);
120        return new ESRestClient(client.getLowLevelClient(), client);
121    }
122
123    private void addClientCallback(ElasticSearchClientConfig config, RestClientBuilder builder) {
124        BasicCredentialsProvider credentialProvider = getCredentialProvider(config);
125        SSLContext sslContext = getSslContext(config);
126        builder.setHttpClientConfigCallback(httpClientBuilder -> {
127            if (sslContext != null) {
128                httpClientBuilder.setSSLContext(sslContext);
129            }
130            if (credentialProvider != null) {
131                httpClientBuilder.setDefaultCredentialsProvider(credentialProvider);
132            }
133            return httpClientBuilder;
134        });
135    }
136
137    protected BasicCredentialsProvider getCredentialProvider(ElasticSearchClientConfig config) {
138        if (StringUtils.isBlank(config.getOption(AUTH_USER_OPT))) {
139            return null;
140        }
141        String user = config.getOption(AUTH_USER_OPT);
142        String password = config.getOption(AUTH_PASSWORD_OPT);
143        BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
144        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
145        return credentialsProvider;
146    }
147
148    protected SSLContext getSslContext(ElasticSearchClientConfig config) {
149        if (StringUtils.isBlank(config.getOption(KEYSTORE_PATH_OPT))) {
150            return null;
151        }
152        try {
153            Path keyStorePath = Paths.get(config.getOption(KEYSTORE_PATH_OPT));
154            String keyStorePass = config.getOption(KEYSTORE_PASSWORD_OPT);
155            String keyStoreType = StringUtils.defaultIfBlank(config.getOption(KEYSTORE_TYPE_OPT), KeyStore.getDefaultType());
156            char[] keyPass = StringUtils.isBlank(keyStorePass) ? null : keyStorePass.toCharArray();
157            KeyStore keyStore = KeyStore.getInstance(keyStoreType);
158            try (InputStream is = Files.newInputStream(keyStorePath)) {
159                keyStore.load(is, keyPass);
160            }
161            SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(keyStore, null);
162            return sslBuilder.build();
163        } catch (GeneralSecurityException | IOException e) {
164            throw new NuxeoException("Cannot setup SSL for RestClient: " + config, e);
165        }
166
167    }
168
169    protected int getConnectTimeoutMs(ElasticSearchClientConfig config) {
170        return Integer.parseInt(config.getOption(CONNECTION_TIMEOUT_MS_OPT, DEFAULT_CONNECT_TIMEOUT_MS));
171    }
172
173    protected int getSocketTimeoutMs(ElasticSearchClientConfig config) {
174        return Integer.parseInt(config.getOption(SOCKET_TIMEOUT_MS_OPT, DEFAULT_SOCKET_TIMEOUT_MS));
175    }
176
177    protected void checkConnection(RestHighLevelClient client) {
178        boolean ping = false;
179        try {
180            ping = client.ping();
181        } catch (IOException e) {
182            log.error(e.getMessage(), e);
183        }
184        if (!ping) {
185            throw new IllegalStateException("Fail to ping rest node");
186        }
187    }
188}