001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.net.MalformedURLException;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.net.URL;
027import java.nio.file.Files;
028import java.nio.file.Path;
029import java.nio.file.Paths;
030import java.security.GeneralSecurityException;
031import java.security.KeyStore;
032
033import javax.net.ssl.SSLContext;
034
035import org.apache.commons.lang.StringUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.apache.http.HttpHost;
039import org.apache.http.auth.AuthScope;
040import org.apache.http.auth.UsernamePasswordCredentials;
041import org.apache.http.impl.client.BasicCredentialsProvider;
042import org.apache.http.ssl.SSLContextBuilder;
043import org.apache.http.ssl.SSLContexts;
044import org.elasticsearch.client.RestClient;
045import org.elasticsearch.client.RestClientBuilder;
046import org.elasticsearch.client.RestHighLevelClient;
047import org.nuxeo.ecm.core.api.NuxeoException;
048import org.nuxeo.elasticsearch.api.ESClient;
049import org.nuxeo.elasticsearch.api.ESClientFactory;
050import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
051import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig;
052import org.nuxeo.elasticsearch.core.ElasticSearchEmbeddedNode;
053
054/**
055 * @since 9.3
056 */
057public class ESRestClientFactory implements ESClientFactory {
058    private static final Log log = LogFactory.getLog(ESRestClientFactory.class);
059
060    public static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
061
062    public static final String DEFAULT_SOCKET_TIMEOUT_MS = "20000";
063
064    public static final String CONNECTION_TIMEOUT_MS_OPT = "connection.timeout.ms";
065
066    public static final String SOCKET_TIMEOUT_MS_OPT = "socket.timeout.ms";
067
068    public static final String AUTH_USER_OPT = "username";
069
070    public static final String AUTH_PASSWORD_OPT = "password";
071
072    public static final String KEYSTORE_PATH_OPT = "keystore.path";
073
074    public static final String KEYSTORE_PASSWORD_OPT = "keystore.password";
075
076    @Override
077    public ESClient create(ElasticSearchEmbeddedNode node, ElasticSearchClientConfig config) {
078        if (node != null) {
079            return createLocalRestClient(node.getConfig());
080        }
081        return createRestClient(config);
082    }
083
084    protected ESClient createLocalRestClient(ElasticSearchEmbeddedServerConfig serverConfig) {
085        if (!serverConfig.httpEnabled()) {
086            throw new IllegalArgumentException(
087                    "Embedded configuration has no HTTP port enable, use TransportClient instead of Rest");
088        }
089        RestClient lowLevelRestClient = RestClient.builder(
090                new HttpHost("localhost", Integer.parseInt(serverConfig.getHttpPort()))).build();
091        RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient);
092        // checkConnection(client);
093        return new ESRestClient(lowLevelRestClient, client);
094    }
095
096    protected ESClient createRestClient(ElasticSearchClientConfig config) {
097        String addressList = config.getOption("addressList", "");
098        if (addressList.isEmpty()) {
099            throw new IllegalArgumentException("No addressList option provided cannot connect RestClient");
100        }
101        String[] hosts = addressList.split(",");
102        HttpHost[] httpHosts = new HttpHost[hosts.length];
103        int i = 0;
104        for (String host : hosts) {
105            httpHosts[i++] = HttpHost.create(host);
106        }
107        RestClientBuilder builder = RestClient.builder(httpHosts)
108                                              .setRequestConfigCallback(
109                                                      requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(
110                                                              getConnectTimeoutMs(config)).setSocketTimeout(
111                                                                      getSocketTimeoutMs(config)))
112                                              .setMaxRetryTimeoutMillis(getConnectTimeoutMs(config));
113        if (StringUtils.isNotBlank(config.getOption(AUTH_USER_OPT))
114                || StringUtils.isNotBlank(config.getOption(KEYSTORE_PATH_OPT))) {
115            addClientCallback(config, builder);
116        }
117        RestClient lowLevelRestClient = builder.build();
118        RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient);
119        // checkConnection(client);
120        return new ESRestClient(lowLevelRestClient, client);
121    }
122
123    private void addClientCallback(ElasticSearchClientConfig config, RestClientBuilder builder) {
124        BasicCredentialsProvider credentialProvider = getCredentialProvider(config);
125        SSLContext sslContext = getSslContext(config);
126        builder.setHttpClientConfigCallback(httpClientBuilder -> {
127            if (sslContext != null) {
128                httpClientBuilder.setSSLContext(sslContext);
129            }
130            if (credentialProvider != null) {
131                httpClientBuilder.setDefaultCredentialsProvider(credentialProvider);
132            }
133            return httpClientBuilder;
134        });
135    }
136
137    protected BasicCredentialsProvider getCredentialProvider(ElasticSearchClientConfig config) {
138        if (StringUtils.isBlank(config.getOption(AUTH_USER_OPT))) {
139            return null;
140        }
141        String user = config.getOption(AUTH_USER_OPT);
142        String password = config.getOption(AUTH_PASSWORD_OPT);
143        BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
144        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
145        return credentialsProvider;
146    }
147
148    protected SSLContext getSslContext(ElasticSearchClientConfig config) {
149        if (StringUtils.isBlank(config.getOption(KEYSTORE_PATH_OPT))) {
150            return null;
151        }
152        try {
153            Path keyStorePath = Paths.get(config.getOption(KEYSTORE_PATH_OPT));
154            String keyStorePass = config.getOption(KEYSTORE_PASSWORD_OPT);
155            KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
156            try (InputStream is = Files.newInputStream(keyStorePath)) {
157                keyStore.load(is, keyStorePass.toCharArray());
158            }
159            SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(keyStore, null);
160            return sslBuilder.build();
161        } catch (GeneralSecurityException | IOException e) {
162            throw new NuxeoException("Cannot setup SSL for RestClient: " + config, e);
163        }
164
165    }
166
167    protected int getConnectTimeoutMs(ElasticSearchClientConfig config) {
168        return Integer.parseInt(config.getOption(CONNECTION_TIMEOUT_MS_OPT, DEFAULT_CONNECT_TIMEOUT_MS));
169    }
170
171    protected int getSocketTimeoutMs(ElasticSearchClientConfig config) {
172        return Integer.parseInt(config.getOption(SOCKET_TIMEOUT_MS_OPT, DEFAULT_SOCKET_TIMEOUT_MS));
173    }
174
175    protected void checkConnection(RestHighLevelClient client) {
176        boolean ping = false;
177        try {
178            ping = client.ping();
179        } catch (IOException e) {
180            log.error(e.getMessage(), e);
181        }
182        if (!ping) {
183            throw new IllegalStateException("Fail to ping rest node");
184        }
185    }
186}