001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.file.Files;
024import java.nio.file.Paths;
025import java.security.GeneralSecurityException;
026import java.security.KeyStore;
027
028import javax.net.ssl.SSLContext;
029
030import org.apache.commons.lang3.StringUtils;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.apache.http.HttpHost;
034import org.apache.http.auth.AuthScope;
035import org.apache.http.auth.UsernamePasswordCredentials;
036import org.apache.http.impl.client.BasicCredentialsProvider;
037import org.apache.http.ssl.SSLContextBuilder;
038import org.apache.http.ssl.SSLContexts;
039import org.elasticsearch.client.RequestOptions;
040import org.elasticsearch.client.RestClient;
041import org.elasticsearch.client.RestClientBuilder;
042import org.elasticsearch.client.RestHighLevelClient;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.elasticsearch.api.ESClient;
045import org.nuxeo.elasticsearch.api.ESClientFactory;
046import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
047import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig;
048import org.nuxeo.elasticsearch.core.ElasticSearchEmbeddedNode;
049import org.nuxeo.runtime.api.Framework;
050
051/**
052 * @since 9.3
053 */
054public class ESRestClientFactory implements ESClientFactory {
055    private static final Log log = LogFactory.getLog(ESRestClientFactory.class);
056
057    public static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
058
059    public static final String DEFAULT_SOCKET_TIMEOUT_MS = "20000";
060
061    public static final String CONNECTION_TIMEOUT_MS_OPT = "connection.timeout.ms";
062
063    public static final String SOCKET_TIMEOUT_MS_OPT = "socket.timeout.ms";
064
065    public static final String AUTH_USER_OPT = "username";
066
067    public static final String AUTH_PASSWORD_OPT = "password";
068
069    /** @since 10.3 */
070    public static final String TRUST_STORE_PATH_OPT = "trustStorePath";
071
072    /** @since 10.3 */
073    public static final String TRUST_STORE_PASSWORD_OPT = "trustStorePassword";
074
075    /** @since 10.3 */
076    public static final String TRUST_STORE_TYPE_OPT = "trustStoreType";
077
078    /** @since 10.3 */
079    public static final String KEY_STORE_PATH_OPT = "keyStorePath";
080
081    /** @since 10.3 */
082    public static final String KEY_STORE_PASSWORD_OPT = "keyStorePassword";
083
084    /** @since 10.3 */
085    public static final String KEY_STORE_TYPE_OPT = "keyStoreType";
086
087    /** @deprecated since 10.3, misnamed, use {@link #TRUST_STORE_PATH_OPT} instead */
088    @Deprecated
089    public static final String DEPRECATED_TRUST_STORE_PATH_OPT = "keystore.path";
090
091    /** @deprecated since 10.3, misnamed, use {@link #TRUST_STORE_PATH_OPT} instead */
092    @Deprecated
093    public static final String DEPRECATED_TRUST_STORE_PASSWORD_OPT = "keystore.password";
094
095    /**
096     * @since 9.10-HF01
097     * @deprecated since 10.3, misnamed, use {@link #TRUST_STORE_PATH_OPT} instead
098     */
099    @Deprecated
100    public static final String DEPRECATED_TRUST_STORE_TYPE_OPT = "keystore.type";
101
102    @Override
103    public ESClient create(ElasticSearchEmbeddedNode node, ElasticSearchClientConfig config) {
104        if (node != null) {
105            return createLocalRestClient(node.getConfig());
106        }
107        return createRestClient(config);
108    }
109
110    protected ESClient createLocalRestClient(ElasticSearchEmbeddedServerConfig serverConfig) {
111        if (!serverConfig.httpEnabled()) {
112            throw new IllegalArgumentException(
113                    "Embedded configuration has no HTTP port enable, use TransportClient instead of Rest");
114        }
115        RestClientBuilder lowLevelRestClientBuilder = RestClient.builder(
116                new HttpHost("localhost", Integer.parseInt(serverConfig.getHttpPort())));
117        RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClientBuilder); // NOSONAR (factory)
118        // checkConnection(client);
119        return new ESRestClient(client.getLowLevelClient(), client);
120    }
121
122    protected ESClient createRestClient(ElasticSearchClientConfig config) {
123        String addressList = config.getOption("addressList", "");
124        if (addressList.isEmpty()) {
125            throw new IllegalArgumentException("No addressList option provided cannot connect RestClient");
126        }
127        String[] hosts = addressList.split(",");
128        HttpHost[] httpHosts = new HttpHost[hosts.length];
129        int i = 0;
130        for (String host : hosts) {
131            httpHosts[i++] = HttpHost.create(host);
132        }
133        RestClientBuilder builder = RestClient.builder(httpHosts)
134                                              .setRequestConfigCallback(
135                                                      requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(
136                                                              getConnectTimeoutMs(config))
137                                                                                                  .setSocketTimeout(
138                                                                                                          getSocketTimeoutMs(
139                                                                                                                  config)))
140                                              .setMaxRetryTimeoutMillis(getConnectTimeoutMs(config));
141        addClientCallback(config, builder);
142        RestHighLevelClient client = new RestHighLevelClient(builder); // NOSONAR (factory)
143        // checkConnection(client);
144        return new ESRestClient(client.getLowLevelClient(), client);
145    }
146
147    private void addClientCallback(ElasticSearchClientConfig config, RestClientBuilder builder) {
148        BasicCredentialsProvider credentialProvider = getCredentialProvider(config);
149        SSLContext sslContext = getSslContext(config);
150        if (sslContext == null && credentialProvider == null) {
151            return;
152        }
153        builder.setHttpClientConfigCallback(httpClientBuilder -> {
154            httpClientBuilder.setSSLContext(sslContext);
155            httpClientBuilder.setDefaultCredentialsProvider(credentialProvider);
156            return httpClientBuilder;
157        });
158    }
159
160    protected BasicCredentialsProvider getCredentialProvider(ElasticSearchClientConfig config) {
161        if (StringUtils.isBlank(config.getOption(AUTH_USER_OPT))) {
162            return null;
163        }
164        String user = config.getOption(AUTH_USER_OPT);
165        String password = config.getOption(AUTH_PASSWORD_OPT);
166        BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
167        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
168        return credentialsProvider;
169    }
170
171    protected SSLContext getSslContext(ElasticSearchClientConfig config) {
172        checkDeprecatedProperties();
173        String trustStorePath = StringUtils.defaultIfBlank(config.getOption(TRUST_STORE_PATH_OPT),
174                config.getOption(DEPRECATED_TRUST_STORE_PATH_OPT));
175        String trustStorePassword = StringUtils.defaultIfBlank(config.getOption(TRUST_STORE_PASSWORD_OPT),
176                config.getOption(DEPRECATED_TRUST_STORE_PASSWORD_OPT));
177        String trustStoreType = StringUtils.defaultIfBlank(config.getOption(TRUST_STORE_TYPE_OPT),
178                config.getOption(DEPRECATED_TRUST_STORE_TYPE_OPT));
179        String keyStorePath = config.getOption(KEY_STORE_PATH_OPT);
180        String keyStorePassword = config.getOption(KEY_STORE_PASSWORD_OPT);
181        String keyStoreType = config.getOption(KEY_STORE_TYPE_OPT);
182        try {
183            KeyStore trustStore = loadKeyStore(trustStorePath, trustStorePassword, trustStoreType);
184            KeyStore keyStore = loadKeyStore(keyStorePath, keyStorePassword, keyStoreType);
185            if (trustStore == null && keyStore == null) {
186                return null;
187            }
188            SSLContextBuilder sslContextBuilder = SSLContexts.custom();
189            if (trustStore != null) {
190                sslContextBuilder.loadTrustMaterial(trustStore, null);
191            }
192            if (keyStore != null) {
193                sslContextBuilder.loadKeyMaterial(keyStore, null);
194            }
195            return sslContextBuilder.build();
196        } catch (GeneralSecurityException | IOException e) {
197            throw new NuxeoException("Cannot setup SSL for RestClient: " + config, e);
198        }
199    }
200
201    // deprecated and new system properties, used in warnings only
202    // actual values are used in templates and accessed through the options map of ElasticSearchClientConfig
203
204    protected static final String DEPRECATED_ES_TRUST_STORE_PATH_PROP = "elasticsearch.restClient.keystorePath";
205
206    protected static final String DEPRECATED_ES_TRUST_STORE_PASSWORD_PROP = "elasticsearch.restClient.keystorePassword";
207
208    protected static final String DEPRECATED_ES_TRUST_STORE_TYPE_PROP = "elasticsearch.restClient.keystoreType";
209
210    protected static final String ES_TRUST_STORE_PATH_PROP = "elasticsearch.restClient.truststore.path";
211
212    protected static final String ES_TRUST_STORE_PASSWORD_PROP = "elasticsearch.restClient.truststore.password";
213
214    protected static final String ES_TRUST_STORE_TYPE_PROP = "elasticsearch.restClient.truststore.type";
215
216    protected void checkDeprecatedProperties() {
217        checkDeprecatedProperty(DEPRECATED_ES_TRUST_STORE_PATH_PROP, ES_TRUST_STORE_PATH_PROP);
218        checkDeprecatedProperty(DEPRECATED_ES_TRUST_STORE_PASSWORD_PROP, ES_TRUST_STORE_PASSWORD_PROP);
219        checkDeprecatedProperty(DEPRECATED_ES_TRUST_STORE_TYPE_PROP, ES_TRUST_STORE_TYPE_PROP);
220    }
221
222    protected void checkDeprecatedProperty(String oldProp, String newProp) {
223        if (Framework.getRuntime() == null) {
224            // unit tests
225            return;
226        }
227        if (StringUtils.isNotBlank(Framework.getProperty(oldProp))) {
228            log.warn("Configuration property " + oldProp + " is deprecated, use " + newProp + " instead");
229        }
230    }
231
232    protected KeyStore loadKeyStore(String path, String password, String type)
233            throws GeneralSecurityException, IOException {
234        if (StringUtils.isBlank(path)) {
235            return null;
236        }
237        String keyStoreType = StringUtils.defaultIfBlank(type, KeyStore.getDefaultType());
238        KeyStore keyStore = KeyStore.getInstance(keyStoreType);
239        char[] passwordChars = StringUtils.isBlank(password) ? null : password.toCharArray();
240        try (InputStream is = Files.newInputStream(Paths.get(path))) {
241            keyStore.load(is, passwordChars);
242        }
243        return keyStore;
244    }
245
246    protected int getConnectTimeoutMs(ElasticSearchClientConfig config) {
247        return Integer.parseInt(config.getOption(CONNECTION_TIMEOUT_MS_OPT, DEFAULT_CONNECT_TIMEOUT_MS));
248    }
249
250    protected int getSocketTimeoutMs(ElasticSearchClientConfig config) {
251        return Integer.parseInt(config.getOption(SOCKET_TIMEOUT_MS_OPT, DEFAULT_SOCKET_TIMEOUT_MS));
252    }
253
254    protected void checkConnection(RestHighLevelClient client) {
255        boolean ping = false;
256        try {
257            ping = client.ping(RequestOptions.DEFAULT);
258        } catch (IOException e) {
259            log.error(e.getMessage(), e);
260        }
261        if (!ping) {
262            throw new IllegalStateException("Fail to ping rest node");
263        }
264    }
265}