001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import java.util.Arrays;
022import java.util.Iterator;
023import java.util.List;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
029import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
030import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
031import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
032import org.elasticsearch.action.bulk.BulkProcessor;
033import org.elasticsearch.action.bulk.BulkRequest;
034import org.elasticsearch.action.bulk.BulkResponse;
035import org.elasticsearch.action.delete.DeleteRequest;
036import org.elasticsearch.action.delete.DeleteResponse;
037import org.elasticsearch.action.get.GetRequest;
038import org.elasticsearch.action.get.GetResponse;
039import org.elasticsearch.action.index.IndexRequest;
040import org.elasticsearch.action.index.IndexResponse;
041import org.elasticsearch.action.search.ClearScrollRequest;
042import org.elasticsearch.action.search.ClearScrollResponse;
043import org.elasticsearch.action.search.SearchRequest;
044import org.elasticsearch.action.search.SearchResponse;
045import org.elasticsearch.action.search.SearchScrollRequest;
046import org.elasticsearch.client.Client;
047import org.elasticsearch.client.transport.NoNodeAvailableException;
048import org.elasticsearch.cluster.health.ClusterHealthStatus;
049import org.elasticsearch.cluster.metadata.AliasMetaData;
050import org.elasticsearch.common.collect.ImmutableOpenMap;
051import org.elasticsearch.common.unit.TimeValue;
052import org.elasticsearch.common.xcontent.XContentType;
053import org.elasticsearch.index.engine.VersionConflictEngineException;
054import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
055import org.nuxeo.ecm.core.api.NuxeoException;
056import org.nuxeo.elasticsearch.api.ESClient;
057
058/**
059 * @since 9.3
060 */
061public class ESTransportClient implements ESClient {
062    private static final Log log = LogFactory.getLog(ESTransportClient.class);
063
064    protected Client client;
065
066    public ESTransportClient(Client client) {
067        this.client = client;
068    }
069
070    @Override
071    public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) {
072        String timeout = timeoutSecond + "s";
073        log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames));
074        try {
075            ClusterHealthResponse response = client.admin()
076                                                   .cluster()
077                                                   .prepareHealth(indexNames)
078                                                   .setTimeout(timeout)
079                                                   .setWaitForYellowStatus()
080                                                   .get();
081            if (response.isTimedOut()) {
082                throw new NuxeoException(
083                        "Elasticsearch Cluster health status not Yellow after " + timeout + " give up: " + response);
084            }
085            if (response.getStatus() != ClusterHealthStatus.GREEN) {
086                log.warn("Elasticsearch Cluster ready but not GREEN: " + response);
087                return false;
088            }
089            log.info("Elasticsearch Cluster ready: " + response);
090        } catch (NoNodeAvailableException e) {
091            throw new NuxeoException(
092                    "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage());
093        }
094        return true;
095    }
096
097    @Override
098    public ClusterHealthStatus getHealthStatus(String[] indexNames) {
099        return client.admin().cluster().prepareHealth(indexNames).get().getStatus();
100    }
101
102    @Override
103    public void refresh(String indexName) {
104        client.admin().indices().prepareRefresh(indexName).get();
105    }
106
107    @Override
108    public void flush(String indexName) {
109        client.admin().indices().prepareFlush(indexName).get();
110    }
111
112    @Override
113    public void optimize(String indexName) {
114        client.admin().indices().prepareForceMerge(indexName).get();
115    }
116
117    @Override
118    public boolean indexExists(String indexName) {
119        return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists();
120    }
121
122    @Override
123    public boolean mappingExists(String indexName, String type) {
124        GetMappingsResponse mappings = client.admin().indices().prepareGetMappings(indexName).execute().actionGet();
125        if (mappings == null || mappings.getMappings().isEmpty()) {
126            return false;
127        }
128        // The real index might have another name if indexName is an alias so we check the mapping of the first item.
129        return mappings.getMappings().values().iterator().next().value.containsKey(type);
130    }
131
132    @Override
133    public void deleteIndex(String indexName, int timeoutSecond) {
134        TimeValue timeout = new TimeValue(timeoutSecond, TimeUnit.SECONDS);
135        client.admin()
136              .indices()
137              .delete(new DeleteIndexRequest(indexName).timeout(timeout).masterNodeTimeout(timeout))
138              .actionGet();
139    }
140
141    @Override
142    public void createIndex(String indexName, String jsonSettings) {
143        client.admin().indices().prepareCreate(indexName).setSettings(jsonSettings, XContentType.JSON).get();
144    }
145
146    @Override
147    public void createMapping(String indexName, String type, String jsonMapping) {
148        client.admin()
149              .indices()
150              .preparePutMapping(indexName)
151              .setType(type)
152              .setSource(jsonMapping, XContentType.JSON)
153              .get();
154    }
155
156    @Override
157    public String getNodesInfo() {
158        return client.admin().cluster().prepareNodesInfo().get().toString();
159    }
160
161    @Override
162    public String getNodesStats() {
163        return client.admin().cluster().prepareNodesStats().get().toString();
164    }
165
166    @Override
167    public boolean aliasExists(String aliasName) {
168        return client.admin().indices().prepareAliasesExist(aliasName).get().isExists();
169    }
170
171    @Override
172    public String getFirstIndexForAlias(String aliasName) {
173        ImmutableOpenMap<String, List<AliasMetaData>> aliases = client.admin()
174                                                                      .indices()
175                                                                      .prepareGetAliases(aliasName)
176                                                                      .get()
177                                                                      .getAliases();
178        for (Iterator<String> it = aliases.keysIt(); it.hasNext();) {
179            String indexName = it.next();
180            if (!aliases.get(indexName).isEmpty()) {
181                return indexName;
182            }
183        }
184        return null;
185    }
186
187    @Override
188    public void updateAlias(String aliasName, String indexName) {
189        IndicesAliasesRequestBuilder cmd = client.admin().indices().prepareAliases();
190        if (aliasExists(aliasName)) {
191            cmd.removeAlias(getFirstIndexForAlias(aliasName), aliasName);
192        }
193        cmd.addAlias(indexName, aliasName).execute().actionGet();
194    }
195
196    @Override
197    public BulkResponse bulk(BulkRequest request) {
198        return client.bulk(request).actionGet();
199    }
200
201    @Override
202    public DeleteResponse delete(DeleteRequest request) {
203        return client.delete(request).actionGet();
204    }
205
206    @Override
207    public SearchResponse search(SearchRequest request) {
208        return client.search(request).actionGet();
209    }
210
211    @Override
212    public SearchResponse searchScroll(SearchScrollRequest request) {
213        return client.searchScroll(request).actionGet();
214    }
215
216    @Override
217    public GetResponse get(GetRequest request) {
218        return client.get(request).actionGet();
219    }
220
221    @Override
222    public IndexResponse index(IndexRequest request) {
223        try {
224            return client.index(request).actionGet();
225        } catch (VersionConflictEngineException e) {
226            throw new ConcurrentUpdateException(e);
227        }
228    }
229
230    @Override
231    public ClearScrollResponse clearScroll(ClearScrollRequest request) {
232        return client.clearScroll(request).actionGet();
233    }
234
235    @Override
236    public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener listener) {
237        return BulkProcessor.builder(client, listener);
238    }
239
240    @Override
241    public void close() {
242        if (client != null) {
243            client.close();
244            client = null;
245        }
246    }
247}