001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.client;
022
023import java.net.InetAddress;
024import java.net.UnknownHostException;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.client.transport.TransportClient;
029import org.elasticsearch.common.settings.Settings;
030import org.elasticsearch.common.transport.TransportAddress;
031import org.elasticsearch.transport.client.PreBuiltTransportClient;
032import org.nuxeo.elasticsearch.api.ESClient;
033import org.nuxeo.elasticsearch.api.ESClientFactory;
034import org.nuxeo.elasticsearch.config.ElasticSearchClientConfig;
035import org.nuxeo.elasticsearch.core.ElasticSearchEmbeddedNode;
036
037/**
038 * @since 9.3
039 */
040public class ESTransportClientFactory implements ESClientFactory {
041    private static final Log log = LogFactory.getLog(ESTransportClientFactory.class);
042
043    public ESTransportClientFactory() {
044    }
045
046    public Settings.Builder getSetting(ElasticSearchClientConfig config) {
047        return Settings.builder()
048                       .put("cluster.name", config.getOption("clusterName", "elasticsearch"))
049                       .put("client.transport.nodes_sampler_interval",
050                               config.getOption("clientTransportNodesSamplerInterval", "5s"))
051                       .put("client.transport.ping_timeout", config.getOption("clientTransportPingTimeout", "5s"))
052                       .put("client.transport.ignore_cluster_name",
053                               config.getOption("clientTransportIgnoreClusterName", "false"))
054                       .put("client.transport.sniff", config.getOption("clientTransportSniff", "true"));
055    }
056
057    @Override
058    public ESClient create(ElasticSearchEmbeddedNode node, ElasticSearchClientConfig config) {
059        log.info("Creating an Elasticsearch TransportClient");
060        if (node == null) {
061            return createRemoteClient(config);
062        }
063        return createLocalClient(node);
064    }
065
066    @SuppressWarnings("resource")
067    protected ESClient createRemoteClient(ElasticSearchClientConfig config) {
068        Settings settings = getSetting(config).build();
069        log.debug("Using settings: " + settings.toDelimitedString(','));
070        TransportClient client = new PreBuiltTransportClient(settings); // not closed here
071        String[] addresses = config.getOption("addressList", "").split(",");
072        if (addresses.length == 0) {
073            throw new IllegalArgumentException("No addressList option provided cannot connect TransportClient");
074        } else {
075            for (String item : addresses) {
076                String[] address = item.split(":");
077                log.debug("Add transport address: " + item);
078                try {
079                    InetAddress inet = InetAddress.getByName(address[0]);
080                    client.addTransportAddress(new TransportAddress(inet, Integer.parseInt(address[1])));
081                } catch (UnknownHostException e) {
082                    log.error("Unable to resolve host " + address[0], e);
083                }
084            }
085        }
086        return new ESTransportClient(client);
087    }
088
089    protected ESClient createLocalClient(ElasticSearchEmbeddedNode node) {
090        log.info("Creating a TransportClient to a local Elasticsearch");
091        return new ESTransportClient(node.getNode().client());
092    }
093}