001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import java.util.Arrays; 022import java.util.List; 023import java.util.concurrent.TimeUnit; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 028import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; 029import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 030import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; 031import org.elasticsearch.action.bulk.BulkRequest; 032import org.elasticsearch.action.bulk.BulkResponse; 033import org.elasticsearch.action.delete.DeleteRequest; 034import org.elasticsearch.action.delete.DeleteResponse; 035import org.elasticsearch.action.get.GetRequest; 036import org.elasticsearch.action.get.GetResponse; 037import org.elasticsearch.action.index.IndexRequest; 038import org.elasticsearch.action.index.IndexResponse; 039import org.elasticsearch.action.search.ClearScrollRequest; 040import org.elasticsearch.action.search.ClearScrollResponse; 041import org.elasticsearch.action.search.SearchRequest; 042import org.elasticsearch.action.search.SearchResponse; 043import org.elasticsearch.action.search.SearchScrollRequest; 044import org.elasticsearch.client.Client; 045import org.elasticsearch.client.transport.NoNodeAvailableException; 046import org.elasticsearch.cluster.health.ClusterHealthStatus; 047import org.elasticsearch.cluster.metadata.AliasMetaData; 048import org.elasticsearch.common.collect.ImmutableOpenMap; 049import org.elasticsearch.common.unit.TimeValue; 050import org.elasticsearch.common.xcontent.XContentType; 051import org.nuxeo.ecm.core.api.NuxeoException; 052import org.nuxeo.elasticsearch.api.ESClient; 053 054/** 055 * @since 9.3 056 */ 057public class ESTransportClient implements ESClient { 058 private static final Log log = LogFactory.getLog(ESTransportClient.class); 059 060 protected Client client; 061 062 public ESTransportClient(Client client) { 063 this.client = client; 064 } 065 066 @Override 067 public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) { 068 String timeout = timeoutSecond + "s"; 069 log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames)); 070 try { 071 ClusterHealthResponse response = client.admin() 072 .cluster() 073 .prepareHealth(indexNames) 074 .setTimeout(timeout) 075 .setWaitForYellowStatus() 076 .get(); 077 if (response.isTimedOut()) { 078 throw new NuxeoException( 079 "Elasticsearch Cluster health status not Yellow after " + timeout + " give up: " + response); 080 } 081 if (response.getStatus() != ClusterHealthStatus.GREEN) { 082 log.warn("Elasticsearch Cluster ready but not GREEN: " + response); 083 return false; 084 } 085 log.info("Elasticsearch Cluster ready: " + response); 086 } catch (NoNodeAvailableException e) { 087 throw new NuxeoException( 088 "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage()); 089 } 090 return true; 091 } 092 093 @Override 094 public ClusterHealthStatus getHealthStatus(String[] indexNames) { 095 return client.admin().cluster().prepareHealth(indexNames).get().getStatus(); 096 } 097 098 @Override 099 public void refresh(String indexName) { 100 client.admin().indices().prepareRefresh(indexName).get(); 101 } 102 103 @Override 104 public void flush(String indexName) { 105 client.admin().indices().prepareFlush(indexName).get(); 106 } 107 108 @Override 109 public void optimize(String indexName) { 110 client.admin().indices().prepareForceMerge(indexName).get(); 111 } 112 113 @Override 114 public boolean indexExists(String indexName) { 115 return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists(); 116 } 117 118 @Override 119 public boolean mappingExists(String indexName, String type) { 120 GetMappingsResponse mappings = client.admin() 121 .indices() 122 .prepareGetMappings(indexName) 123 .execute() 124 .actionGet(); 125 if (mappings == null || mappings.getMappings().isEmpty()) { 126 return false; 127 } 128 // The real index might have another name if indexName is an alias so we check the mapping of the first item. 129 return mappings.getMappings().values().iterator().next().value.containsKey(type); 130 } 131 132 @Override 133 public void deleteIndex(String indexName, int timeoutSecond) { 134 TimeValue timeout = new TimeValue(timeoutSecond, TimeUnit.SECONDS); 135 client.admin() 136 .indices() 137 .delete(new DeleteIndexRequest(indexName).timeout(timeout).masterNodeTimeout(timeout)) 138 .actionGet(); 139 } 140 141 @Override 142 public void createIndex(String indexName, String jsonSettings) { 143 client.admin().indices().prepareCreate(indexName).setSettings(jsonSettings, XContentType.JSON).get(); 144 } 145 146 @Override 147 public void createMapping(String indexName, String type, String jsonMapping) { 148 client.admin() 149 .indices() 150 .preparePutMapping(indexName) 151 .setType(type) 152 .setSource(jsonMapping, XContentType.JSON) 153 .get(); 154 } 155 156 @Override 157 public String getNodesInfo() { 158 return client.admin().cluster().prepareNodesInfo().get().toString(); 159 } 160 161 @Override 162 public String getNodesStats() { 163 return client.admin().cluster().prepareNodesStats().get().toString(); 164 } 165 166 @Override 167 public boolean aliasExists(String aliasName) { 168 return client.admin().indices().prepareAliasesExist(aliasName).get().isExists(); 169 } 170 171 @Override 172 public String getFirstIndexForAlias(String aliasName) { 173 ImmutableOpenMap<String, List<AliasMetaData>> aliases = client.admin().indices().prepareGetAliases(aliasName).get().getAliases(); 174 if (aliases.isEmpty()) { 175 return null; 176 } 177 return aliases.keysIt().next(); 178 } 179 180 @Override 181 public void updateAlias(String aliasName, String indexName) { 182 IndicesAliasesRequestBuilder cmd = client.admin().indices().prepareAliases(); 183 if (aliasExists(aliasName)) { 184 cmd.removeAlias(getFirstIndexForAlias(aliasName), aliasName); 185 } 186 cmd.addAlias(indexName, aliasName).execute().actionGet(); 187 } 188 189 @Override 190 public BulkResponse bulk(BulkRequest request) { 191 return client.bulk(request).actionGet(); 192 } 193 194 @Override 195 public DeleteResponse delete(DeleteRequest request) { 196 return client.delete(request).actionGet(); 197 } 198 199 @Override 200 public SearchResponse search(SearchRequest request) { 201 return client.search(request).actionGet(); 202 } 203 204 @Override 205 public SearchResponse searchScroll(SearchScrollRequest request) { 206 return client.searchScroll(request).actionGet(); 207 } 208 209 @Override 210 public GetResponse get(GetRequest request) { 211 return client.get(request).actionGet(); 212 } 213 214 @Override 215 public IndexResponse index(IndexRequest request) { 216 return client.index(request).actionGet(); 217 } 218 219 @Override 220 public ClearScrollResponse clearScroll(ClearScrollRequest request) { 221 return client.clearScroll(request).actionGet(); 222 } 223 224 @Override 225 public void close() throws Exception { 226 if (client != null) { 227 client.close(); 228 client = null; 229 } 230 } 231}