001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.client;
020
021import java.util.Arrays;
022import java.util.Iterator;
023import java.util.List;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
029import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
030import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
031import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
032import org.elasticsearch.action.bulk.BulkRequest;
033import org.elasticsearch.action.bulk.BulkResponse;
034import org.elasticsearch.action.delete.DeleteRequest;
035import org.elasticsearch.action.delete.DeleteResponse;
036import org.elasticsearch.action.get.GetRequest;
037import org.elasticsearch.action.get.GetResponse;
038import org.elasticsearch.action.index.IndexRequest;
039import org.elasticsearch.action.index.IndexResponse;
040import org.elasticsearch.action.search.ClearScrollRequest;
041import org.elasticsearch.action.search.ClearScrollResponse;
042import org.elasticsearch.action.search.SearchRequest;
043import org.elasticsearch.action.search.SearchResponse;
044import org.elasticsearch.action.search.SearchScrollRequest;
045import org.elasticsearch.client.Client;
046import org.elasticsearch.client.transport.NoNodeAvailableException;
047import org.elasticsearch.cluster.health.ClusterHealthStatus;
048import org.elasticsearch.cluster.metadata.AliasMetaData;
049import org.elasticsearch.common.collect.ImmutableOpenMap;
050import org.elasticsearch.common.unit.TimeValue;
051import org.elasticsearch.common.xcontent.XContentType;
052import org.elasticsearch.index.engine.VersionConflictEngineException;
053import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
054import org.nuxeo.ecm.core.api.NuxeoException;
055import org.nuxeo.elasticsearch.api.ESClient;
056
057/**
058 * @since 9.3
059 */
060public class ESTransportClient implements ESClient {
061    private static final Log log = LogFactory.getLog(ESTransportClient.class);
062
063    protected Client client;
064
065    public ESTransportClient(Client client) {
066        this.client = client;
067    }
068
069    @Override
070    public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) {
071        String timeout = timeoutSecond + "s";
072        log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames));
073        try {
074            ClusterHealthResponse response = client.admin()
075                                                   .cluster()
076                                                   .prepareHealth(indexNames)
077                                                   .setTimeout(timeout)
078                                                   .setWaitForYellowStatus()
079                                                   .get();
080            if (response.isTimedOut()) {
081                throw new NuxeoException(
082                        "Elasticsearch Cluster health status not Yellow after " + timeout + " give up: " + response);
083            }
084            if (response.getStatus() != ClusterHealthStatus.GREEN) {
085                log.warn("Elasticsearch Cluster ready but not GREEN: " + response);
086                return false;
087            }
088            log.info("Elasticsearch Cluster ready: " + response);
089        } catch (NoNodeAvailableException e) {
090            throw new NuxeoException(
091                    "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage());
092        }
093        return true;
094    }
095
096    @Override
097    public ClusterHealthStatus getHealthStatus(String[] indexNames) {
098        return client.admin().cluster().prepareHealth(indexNames).get().getStatus();
099    }
100
101    @Override
102    public void refresh(String indexName) {
103        client.admin().indices().prepareRefresh(indexName).get();
104    }
105
106    @Override
107    public void flush(String indexName) {
108        client.admin().indices().prepareFlush(indexName).get();
109    }
110
111    @Override
112    public void optimize(String indexName) {
113        client.admin().indices().prepareForceMerge(indexName).get();
114    }
115
116    @Override
117    public boolean indexExists(String indexName) {
118        return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists();
119    }
120
121    @Override
122    public boolean mappingExists(String indexName, String type) {
123        GetMappingsResponse mappings = client.admin()
124                                             .indices()
125                                             .prepareGetMappings(indexName)
126                                             .execute()
127                                             .actionGet();
128        if (mappings == null || mappings.getMappings().isEmpty()) {
129            return false;
130        }
131        // The real index might have another name if indexName is an alias so we check the mapping of the first item.
132        return mappings.getMappings().values().iterator().next().value.containsKey(type);
133    }
134
135    @Override
136    public void deleteIndex(String indexName, int timeoutSecond) {
137        TimeValue timeout = new TimeValue(timeoutSecond, TimeUnit.SECONDS);
138        client.admin()
139              .indices()
140              .delete(new DeleteIndexRequest(indexName).timeout(timeout).masterNodeTimeout(timeout))
141              .actionGet();
142    }
143
144    @Override
145    public void createIndex(String indexName, String jsonSettings) {
146        client.admin().indices().prepareCreate(indexName).setSettings(jsonSettings, XContentType.JSON).get();
147    }
148
149    @Override
150    public void createMapping(String indexName, String type, String jsonMapping) {
151        client.admin()
152              .indices()
153              .preparePutMapping(indexName)
154              .setType(type)
155              .setSource(jsonMapping, XContentType.JSON)
156              .get();
157    }
158
159    @Override
160    public String getNodesInfo() {
161        return client.admin().cluster().prepareNodesInfo().get().toString();
162    }
163
164    @Override
165    public String getNodesStats() {
166        return client.admin().cluster().prepareNodesStats().get().toString();
167    }
168
169    @Override
170    public boolean aliasExists(String aliasName) {
171        return client.admin().indices().prepareAliasesExist(aliasName).get().isExists();
172    }
173
174    @Override
175    public String getFirstIndexForAlias(String aliasName) {
176        ImmutableOpenMap<String, List<AliasMetaData>> aliases = client.admin()
177                                                                      .indices()
178                                                                      .prepareGetAliases(aliasName)
179                                                                      .get()
180                                                                      .getAliases();
181        for (Iterator<String> it = aliases.keysIt(); it.hasNext();) {
182            String indexName = it.next();
183            if (!aliases.get(indexName).isEmpty()) {
184                return indexName;
185            }
186        }
187        return null;
188    }
189
190    @Override
191    public void updateAlias(String aliasName, String indexName) {
192        IndicesAliasesRequestBuilder cmd = client.admin().indices().prepareAliases();
193        if (aliasExists(aliasName)) {
194            cmd.removeAlias(getFirstIndexForAlias(aliasName), aliasName);
195        }
196        cmd.addAlias(indexName, aliasName).execute().actionGet();
197    }
198
199    @Override
200    public BulkResponse bulk(BulkRequest request) {
201        return client.bulk(request).actionGet();
202    }
203
204    @Override
205    public DeleteResponse delete(DeleteRequest request) {
206        return client.delete(request).actionGet();
207    }
208
209    @Override
210    public SearchResponse search(SearchRequest request) {
211        return client.search(request).actionGet();
212    }
213
214    @Override
215    public SearchResponse searchScroll(SearchScrollRequest request) {
216        return client.searchScroll(request).actionGet();
217    }
218
219    @Override
220    public GetResponse get(GetRequest request) {
221        return client.get(request).actionGet();
222    }
223
224    @Override
225    public IndexResponse index(IndexRequest request) {
226        try {
227            return client.index(request).actionGet();
228        } catch (VersionConflictEngineException e) {
229            throw new ConcurrentUpdateException(e);
230        }
231    }
232
233    @Override
234    public ClearScrollResponse clearScroll(ClearScrollRequest request) {
235        return client.clearScroll(request).actionGet();
236    }
237
238    @Override
239    public void close() {
240        if (client != null) {
241            client.close();
242            client = null;
243        }
244    }
245}