001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.client; 022 023import java.net.InetAddress; 024import java.net.UnknownHostException; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.client.transport.TransportClient; 029import org.elasticsearch.common.settings.Settings; 030import org.elasticsearch.common.transport.InetSocketTransportAddress; 031import org.elasticsearch.transport.client.PreBuiltTransportClient; 032import org.nuxeo.elasticsearch.api.ESClient; 033import org.nuxeo.elasticsearch.api.ESClientFactory; 034import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig; 035import org.nuxeo.elasticsearch.core.ElasticSearchEmbeddedNode; 036 037/** 038 * @since 9.3 039 */ 040public class ESTransportClientFactory implements ESClientFactory { 041 private static final Log log = LogFactory.getLog(ESTransportClientFactory.class); 042 043 public ESTransportClientFactory() { 044 } 045 046 public Settings.Builder getSetting(ElasticSearchClientConfig config) { 047 return Settings.builder() 048 .put("cluster.name", config.getOption("clusterName", "elasticsearch")) 049 .put("client.transport.nodes_sampler_interval", 050 config.getOption("clientTransportNodesSamplerInterval", "5s")) 051 .put("client.transport.ping_timeout", config.getOption("clientTransportPingTimeout", "5s")) 052 .put("client.transport.ignore_cluster_name", 053 config.getOption("clientTransportIgnoreClusterName", "false")) 054 .put("client.transport.sniff", config.getOption("clientTransportSniff", "true")); 055 } 056 057 @Override 058 public ESClient create(ElasticSearchEmbeddedNode node, ElasticSearchClientConfig config) { 059 log.info("Creating an Elasticsearch TransportClient"); 060 if (node == null) { 061 return createRemoteClient(config); 062 } 063 return createLocalClient(node); 064 } 065 066 protected ESClient createRemoteClient(ElasticSearchClientConfig config) { 067 Settings settings = getSetting(config).build(); 068 log.debug("Using settings: " + settings.toDelimitedString(',')); 069 TransportClient client = new PreBuiltTransportClient(settings); 070 String[] addresses = config.getOption("addressList", "").split(","); 071 if (addresses.length == 0) { 072 throw new IllegalArgumentException("No addressList option provided cannot connect TransportClient"); 073 } else { 074 for (String item : addresses) { 075 String[] address = item.split(":"); 076 log.debug("Add transport address: " + item); 077 try { 078 InetAddress inet = InetAddress.getByName(address[0]); 079 client.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1]))); 080 } catch (UnknownHostException e) { 081 log.error("Unable to resolve host " + address[0], e); 082 } 083 } 084 } 085 return new ESTransportClient(client); 086 } 087 088 protected ESClient createLocalClient(ElasticSearchEmbeddedNode node) { 089 log.info("Creating a TransportClient to a local Elasticsearch"); 090 return new ESTransportClient(node.getNode().client()); 091 } 092}