001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import java.util.Arrays;
022import java.util.List;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
028import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
029import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
030import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
031import org.elasticsearch.action.bulk.BulkRequest;
032import org.elasticsearch.action.bulk.BulkResponse;
033import org.elasticsearch.action.delete.DeleteRequest;
034import org.elasticsearch.action.delete.DeleteResponse;
035import org.elasticsearch.action.get.GetRequest;
036import org.elasticsearch.action.get.GetResponse;
037import org.elasticsearch.action.index.IndexRequest;
038import org.elasticsearch.action.index.IndexResponse;
039import org.elasticsearch.action.search.ClearScrollRequest;
040import org.elasticsearch.action.search.ClearScrollResponse;
041import org.elasticsearch.action.search.SearchRequest;
042import org.elasticsearch.action.search.SearchResponse;
043import org.elasticsearch.action.search.SearchScrollRequest;
044import org.elasticsearch.client.Client;
045import org.elasticsearch.client.transport.NoNodeAvailableException;
046import org.elasticsearch.cluster.health.ClusterHealthStatus;
047import org.elasticsearch.cluster.metadata.AliasMetaData;
048import org.elasticsearch.common.collect.ImmutableOpenMap;
049import org.elasticsearch.common.unit.TimeValue;
050import org.elasticsearch.common.xcontent.XContentType;
051import org.nuxeo.ecm.core.api.NuxeoException;
052import org.nuxeo.elasticsearch.api.ESClient;
053
054/**
055 * @since 9.3
056 */
057public class ESTransportClient implements ESClient {
058    private static final Log log = LogFactory.getLog(ESTransportClient.class);
059
060    protected Client client;
061
062    public ESTransportClient(Client client) {
063        this.client = client;
064    }
065
066    @Override
067    public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) {
068        String timeout = timeoutSecond + "s";
069        log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames));
070        try {
071            ClusterHealthResponse response = client.admin()
072                                                   .cluster()
073                                                   .prepareHealth(indexNames)
074                                                   .setTimeout(timeout)
075                                                   .setWaitForYellowStatus()
076                                                   .get();
077            if (response.isTimedOut()) {
078                throw new NuxeoException(
079                        "Elasticsearch Cluster health status not Yellow after " + timeout + " give up: " + response);
080            }
081            if (response.getStatus() != ClusterHealthStatus.GREEN) {
082                log.warn("Elasticsearch Cluster ready but not GREEN: " + response);
083                return false;
084            }
085            log.info("Elasticsearch Cluster ready: " + response);
086        } catch (NoNodeAvailableException e) {
087            throw new NuxeoException(
088                    "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage());
089        }
090        return true;
091    }
092
093    @Override
094    public ClusterHealthStatus getHealthStatus(String[] indexNames) {
095        return client.admin().cluster().prepareHealth(indexNames).get().getStatus();
096    }
097
098    @Override
099    public void refresh(String indexName) {
100        client.admin().indices().prepareRefresh(indexName).get();
101    }
102
103    @Override
104    public void flush(String indexName) {
105        client.admin().indices().prepareFlush(indexName).get();
106    }
107
108    @Override
109    public void optimize(String indexName) {
110        client.admin().indices().prepareForceMerge(indexName).get();
111    }
112
113    @Override
114    public boolean indexExists(String indexName) {
115        return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists();
116    }
117
118    @Override
119    public boolean mappingExists(String indexName, String type) {
120        GetMappingsResponse mappings = client.admin()
121                                             .indices()
122                                             .prepareGetMappings(indexName)
123                                             .execute()
124                                             .actionGet();
125        if (mappings == null || mappings.getMappings().isEmpty()) {
126            return false;
127        }
128        // The real index might have another name if indexName is an alias so we check the mapping of the first item.
129        return mappings.getMappings().values().iterator().next().value.containsKey(type);
130    }
131
132    @Override
133    public void deleteIndex(String indexName, int timeoutSecond) {
134        TimeValue timeout = new TimeValue(timeoutSecond, TimeUnit.SECONDS);
135        client.admin()
136              .indices()
137              .delete(new DeleteIndexRequest(indexName).timeout(timeout).masterNodeTimeout(timeout))
138              .actionGet();
139    }
140
141    @Override
142    public void createIndex(String indexName, String jsonSettings) {
143        client.admin().indices().prepareCreate(indexName).setSettings(jsonSettings, XContentType.JSON).get();
144    }
145
146    @Override
147    public void createMapping(String indexName, String type, String jsonMapping) {
148        client.admin()
149              .indices()
150              .preparePutMapping(indexName)
151              .setType(type)
152              .setSource(jsonMapping, XContentType.JSON)
153              .get();
154    }
155
156    @Override
157    public String getNodesInfo() {
158        return client.admin().cluster().prepareNodesInfo().get().toString();
159    }
160
161    @Override
162    public String getNodesStats() {
163        return client.admin().cluster().prepareNodesStats().get().toString();
164    }
165
166    @Override
167    public boolean aliasExists(String aliasName) {
168        return client.admin().indices().prepareAliasesExist(aliasName).get().isExists();
169    }
170
171    @Override
172    public String getFirstIndexForAlias(String aliasName) {
173        ImmutableOpenMap<String, List<AliasMetaData>> aliases = client.admin().indices().prepareGetAliases(aliasName).get().getAliases();
174        if (aliases.isEmpty()) {
175            return null;
176        }
177        return aliases.keysIt().next();
178    }
179
180    @Override
181    public void updateAlias(String aliasName, String indexName) {
182        IndicesAliasesRequestBuilder cmd = client.admin().indices().prepareAliases();
183        if (aliasExists(aliasName)) {
184            cmd.removeAlias(getFirstIndexForAlias(aliasName), aliasName);
185        }
186        cmd.addAlias(indexName, aliasName).execute().actionGet();
187    }
188
189    @Override
190    public BulkResponse bulk(BulkRequest request) {
191        return client.bulk(request).actionGet();
192    }
193
194    @Override
195    public DeleteResponse delete(DeleteRequest request) {
196        return client.delete(request).actionGet();
197    }
198
199    @Override
200    public SearchResponse search(SearchRequest request) {
201        return client.search(request).actionGet();
202    }
203
204    @Override
205    public SearchResponse searchScroll(SearchScrollRequest request) {
206        return client.searchScroll(request).actionGet();
207    }
208
209    @Override
210    public GetResponse get(GetRequest request) {
211        return client.get(request).actionGet();
212    }
213
214    @Override
215    public IndexResponse index(IndexRequest request) {
216        return client.index(request).actionGet();
217    }
218
219    @Override
220    public ClearScrollResponse clearScroll(ClearScrollRequest request) {
221        return client.clearScroll(request).actionGet();
222    }
223
224    @Override
225    public void close() throws Exception {
226        if (client != null) {
227            client.close();
228            client = null;
229        }
230    }
231}