001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.net.MalformedURLException; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.net.URL; 027import java.nio.file.Files; 028import java.nio.file.Path; 029import java.nio.file.Paths; 030import java.security.GeneralSecurityException; 031import java.security.KeyStore; 032 033import javax.net.ssl.SSLContext; 034 035import org.apache.commons.lang.StringUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.apache.http.HttpHost; 039import org.apache.http.auth.AuthScope; 040import org.apache.http.auth.UsernamePasswordCredentials; 041import org.apache.http.impl.client.BasicCredentialsProvider; 042import org.apache.http.ssl.SSLContextBuilder; 043import org.apache.http.ssl.SSLContexts; 044import org.elasticsearch.client.RestClient; 045import org.elasticsearch.client.RestClientBuilder; 046import org.elasticsearch.client.RestHighLevelClient; 047import org.nuxeo.ecm.core.api.NuxeoException; 048import org.nuxeo.elasticsearch.api.ESClient; 049import org.nuxeo.elasticsearch.api.ESClientFactory; 050import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig; 051import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig; 052import org.nuxeo.elasticsearch.core.ElasticSearchEmbeddedNode; 053 054/** 055 * @since 9.3 056 */ 057public class ESRestClientFactory implements ESClientFactory { 058 private static final Log log = LogFactory.getLog(ESRestClientFactory.class); 059 060 public static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000"; 061 062 public static final String DEFAULT_SOCKET_TIMEOUT_MS = "20000"; 063 064 public static final String CONNECTION_TIMEOUT_MS_OPT = "connection.timeout.ms"; 065 066 public static final String SOCKET_TIMEOUT_MS_OPT = "socket.timeout.ms"; 067 068 public static final String AUTH_USER_OPT = "username"; 069 070 public static final String AUTH_PASSWORD_OPT = "password"; 071 072 public static final String KEYSTORE_PATH_OPT = "keystore.path"; 073 074 public static final String KEYSTORE_PASSWORD_OPT = "keystore.password"; 075 076 @Override 077 public ESClient create(ElasticSearchEmbeddedNode node, ElasticSearchClientConfig config) { 078 if (node != null) { 079 return createLocalRestClient(node.getConfig()); 080 } 081 return createRestClient(config); 082 } 083 084 protected ESClient createLocalRestClient(ElasticSearchEmbeddedServerConfig serverConfig) { 085 if (!serverConfig.httpEnabled()) { 086 throw new IllegalArgumentException( 087 "Embedded configuration has no HTTP port enable, use TransportClient instead of Rest"); 088 } 089 RestClient lowLevelRestClient = RestClient.builder( 090 new HttpHost("localhost", Integer.parseInt(serverConfig.getHttpPort()))).build(); 091 RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); 092 // checkConnection(client); 093 return new ESRestClient(lowLevelRestClient, client); 094 } 095 096 protected ESClient createRestClient(ElasticSearchClientConfig config) { 097 String addressList = config.getOption("addressList", ""); 098 if (addressList.isEmpty()) { 099 throw new IllegalArgumentException("No addressList option provided cannot connect RestClient"); 100 } 101 String[] hosts = addressList.split(","); 102 HttpHost[] httpHosts = new HttpHost[hosts.length]; 103 int i = 0; 104 for (String host : hosts) { 105 httpHosts[i++] = HttpHost.create(host); 106 } 107 RestClientBuilder builder = RestClient.builder(httpHosts) 108 .setRequestConfigCallback( 109 requestConfigBuilder -> requestConfigBuilder.setConnectTimeout( 110 getConnectTimeoutMs(config)).setSocketTimeout( 111 getSocketTimeoutMs(config))) 112 .setMaxRetryTimeoutMillis(getConnectTimeoutMs(config)); 113 if (StringUtils.isNotBlank(config.getOption(AUTH_USER_OPT)) 114 || StringUtils.isNotBlank(config.getOption(KEYSTORE_PATH_OPT))) { 115 addClientCallback(config, builder); 116 } 117 RestClient lowLevelRestClient = builder.build(); 118 RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); 119 // checkConnection(client); 120 return new ESRestClient(lowLevelRestClient, client); 121 } 122 123 private void addClientCallback(ElasticSearchClientConfig config, RestClientBuilder builder) { 124 BasicCredentialsProvider credentialProvider = getCredentialProvider(config); 125 SSLContext sslContext = getSslContext(config); 126 builder.setHttpClientConfigCallback(httpClientBuilder -> { 127 if (sslContext != null) { 128 httpClientBuilder.setSSLContext(sslContext); 129 } 130 if (credentialProvider != null) { 131 httpClientBuilder.setDefaultCredentialsProvider(credentialProvider); 132 } 133 return httpClientBuilder; 134 }); 135 } 136 137 protected BasicCredentialsProvider getCredentialProvider(ElasticSearchClientConfig config) { 138 if (StringUtils.isBlank(config.getOption(AUTH_USER_OPT))) { 139 return null; 140 } 141 String user = config.getOption(AUTH_USER_OPT); 142 String password = config.getOption(AUTH_PASSWORD_OPT); 143 BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); 144 credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); 145 return credentialsProvider; 146 } 147 148 protected SSLContext getSslContext(ElasticSearchClientConfig config) { 149 if (StringUtils.isBlank(config.getOption(KEYSTORE_PATH_OPT))) { 150 return null; 151 } 152 try { 153 Path keyStorePath = Paths.get(config.getOption(KEYSTORE_PATH_OPT)); 154 String keyStorePass = config.getOption(KEYSTORE_PASSWORD_OPT); 155 KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); 156 try (InputStream is = Files.newInputStream(keyStorePath)) { 157 keyStore.load(is, keyStorePass.toCharArray()); 158 } 159 SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(keyStore, null); 160 return sslBuilder.build(); 161 } catch (GeneralSecurityException | IOException e) { 162 throw new NuxeoException("Cannot setup SSL for RestClient: " + config, e); 163 } 164 165 } 166 167 protected int getConnectTimeoutMs(ElasticSearchClientConfig config) { 168 return Integer.parseInt(config.getOption(CONNECTION_TIMEOUT_MS_OPT, DEFAULT_CONNECT_TIMEOUT_MS)); 169 } 170 171 protected int getSocketTimeoutMs(ElasticSearchClientConfig config) { 172 return Integer.parseInt(config.getOption(SOCKET_TIMEOUT_MS_OPT, DEFAULT_SOCKET_TIMEOUT_MS)); 173 } 174 175 protected void checkConnection(RestHighLevelClient client) { 176 boolean ping = false; 177 try { 178 ping = client.ping(); 179 } catch (IOException e) { 180 log.error(e.getMessage(), e); 181 } 182 if (!ping) { 183 throw new IllegalStateException("Fail to ping rest node"); 184 } 185 } 186}