001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import java.util.Arrays;
022import java.util.Iterator;
023import java.util.List;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
029import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
030import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
031import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
032import org.elasticsearch.action.bulk.BulkProcessor;
033import org.elasticsearch.action.bulk.BulkRequest;
034import org.elasticsearch.action.bulk.BulkResponse;
035import org.elasticsearch.action.delete.DeleteRequest;
036import org.elasticsearch.action.delete.DeleteResponse;
037import org.elasticsearch.action.get.GetRequest;
038import org.elasticsearch.action.get.GetResponse;
039import org.elasticsearch.action.index.IndexRequest;
040import org.elasticsearch.action.index.IndexResponse;
041import org.elasticsearch.action.search.ClearScrollRequest;
042import org.elasticsearch.action.search.ClearScrollResponse;
043import org.elasticsearch.action.search.SearchRequest;
044import org.elasticsearch.action.search.SearchResponse;
045import org.elasticsearch.action.search.SearchScrollRequest;
046import org.elasticsearch.client.Client;
047import org.elasticsearch.client.transport.NoNodeAvailableException;
048import org.elasticsearch.cluster.health.ClusterHealthStatus;
049import org.elasticsearch.cluster.metadata.AliasMetadata;
050import org.elasticsearch.common.collect.ImmutableOpenMap;
051import org.elasticsearch.common.unit.TimeValue;
052import org.elasticsearch.common.xcontent.XContentType;
053import org.elasticsearch.index.engine.VersionConflictEngineException;
054import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
055import org.nuxeo.ecm.core.api.NuxeoException;
056import org.nuxeo.elasticsearch.api.ESClient;
057
058/**
059 * @since 9.3
060 */
061public class ESTransportClient implements ESClient {
062    private static final Log log = LogFactory.getLog(ESTransportClient.class);
063
064    protected Client client;
065
066    public ESTransportClient(Client client) {
067        this.client = client;
068    }
069
070    @Override
071    public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) {
072        String timeout = timeoutSecond + "s";
073        log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames));
074        try {
075            ClusterHealthResponse response = client.admin()
076                                                   .cluster()
077                                                   .prepareHealth(indexNames)
078                                                   .setTimeout(timeout)
079                                                   .setWaitForYellowStatus()
080                                                   .get();
081            if (response.isTimedOut()) {
082                throw new NuxeoException(
083                        "Elasticsearch Cluster health status not Yellow after " + timeout + " give up: " + response);
084            }
085            if (response.getStatus() != ClusterHealthStatus.GREEN) {
086                log.warn("Elasticsearch Cluster ready but not GREEN: " + response);
087                return false;
088            }
089            log.info("Elasticsearch Cluster ready: " + response);
090        } catch (NoNodeAvailableException e) {
091            throw new NuxeoException(
092                    "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage());
093        }
094        return true;
095    }
096
097    @Override
098    public ClusterHealthStatus getHealthStatus(String[] indexNames) {
099        return client.admin().cluster().prepareHealth(indexNames).get().getStatus();
100    }
101
102    @Override
103    public void refresh(String indexName) {
104        client.admin().indices().prepareRefresh(indexName).get();
105    }
106
107    @Override
108    public void flush(String indexName) {
109        client.admin().indices().prepareFlush(indexName).get();
110    }
111
112    @Override
113    public void optimize(String indexName) {
114        client.admin().indices().prepareForceMerge(indexName).get();
115    }
116
117    @Override
118    public boolean indexExists(String indexName) {
119        return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists();
120    }
121
122    @Override
123    public boolean mappingExists(String indexName, String type) {
124        GetMappingsResponse mappings = client.admin().indices().prepareGetMappings(indexName).execute().actionGet();
125        if (mappings == null || mappings.getMappings().isEmpty()) {
126            return false;
127        }
128        // The real index might have another name if indexName is an alias so we check the mapping of the first item.
129        return mappings.getMappings().values().iterator().next().value.containsKey(type);
130    }
131
132    @Override
133    public void deleteIndex(String indexName, int timeoutSecond) {
134        TimeValue timeout = new TimeValue(timeoutSecond, TimeUnit.SECONDS);
135        client.admin()
136              .indices()
137              .delete(new DeleteIndexRequest(indexName).timeout(timeout).masterNodeTimeout(timeout))
138              .actionGet();
139    }
140
141    @Override
142    public void createIndex(String indexName, String jsonSettings) {
143        client.admin().indices().prepareCreate(indexName).setSettings(jsonSettings, XContentType.JSON).get();
144    }
145
146    @Override
147    public void createMapping(String indexName, String type, String jsonMapping) {
148        client.admin()
149              .indices()
150              .preparePutMapping(indexName)
151              .setSource(jsonMapping, XContentType.JSON)
152              .get();
153    }
154
155    @Override
156    public String getNodesInfo() {
157        return client.admin().cluster().prepareNodesInfo().get().toString();
158    }
159
160    @Override
161    public String getNodesStats() {
162        return client.admin().cluster().prepareNodesStats().get().toString();
163    }
164
165    @Override
166    public boolean aliasExists(String aliasName) {
167        return client.admin().indices().prepareAliasesExist(aliasName).get().isExists();
168    }
169
170    @Override
171    public String getFirstIndexForAlias(String aliasName) {
172        ImmutableOpenMap<String, List<AliasMetadata>> aliases = client.admin()
173                                                                      .indices()
174                                                                      .prepareGetAliases(aliasName)
175                                                                      .get()
176                                                                      .getAliases();
177        for (Iterator<String> it = aliases.keysIt(); it.hasNext();) {
178            String indexName = it.next();
179            if (!aliases.get(indexName).isEmpty()) {
180                return indexName;
181            }
182        }
183        return null;
184    }
185
186    @Override
187    public void updateAlias(String aliasName, String indexName) {
188        IndicesAliasesRequestBuilder cmd = client.admin().indices().prepareAliases();
189        if (aliasExists(aliasName)) {
190            cmd.removeAlias(getFirstIndexForAlias(aliasName), aliasName);
191        }
192        cmd.addAlias(indexName, aliasName).execute().actionGet();
193    }
194
195    @Override
196    public BulkResponse bulk(BulkRequest request) {
197        return client.bulk(request).actionGet();
198    }
199
200    @Override
201    public DeleteResponse delete(DeleteRequest request) {
202        return client.delete(request).actionGet();
203    }
204
205    @Override
206    public SearchResponse search(SearchRequest request) {
207        return client.search(request).actionGet();
208    }
209
210    @Override
211    public SearchResponse searchScroll(SearchScrollRequest request) {
212        return client.searchScroll(request).actionGet();
213    }
214
215    @Override
216    public GetResponse get(GetRequest request) {
217        return client.get(request).actionGet();
218    }
219
220    @Override
221    public IndexResponse index(IndexRequest request) {
222        try {
223            return client.index(request).actionGet();
224        } catch (VersionConflictEngineException e) {
225            throw new ConcurrentUpdateException(e);
226        }
227    }
228
229    @Override
230    public ClearScrollResponse clearScroll(ClearScrollRequest request) {
231        return client.clearScroll(request).actionGet();
232    }
233
234    @Override
235    public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener listener) {
236        return BulkProcessor.builder(client, listener);
237    }
238
239    @Override
240    public void close() {
241        if (client != null) {
242            client.close();
243            client = null;
244        }
245    }
246}