001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import java.util.Arrays; 022import java.util.Iterator; 023import java.util.List; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 029import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; 030import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 031import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; 032import org.elasticsearch.action.bulk.BulkRequest; 033import org.elasticsearch.action.bulk.BulkResponse; 034import org.elasticsearch.action.delete.DeleteRequest; 035import org.elasticsearch.action.delete.DeleteResponse; 036import org.elasticsearch.action.get.GetRequest; 037import org.elasticsearch.action.get.GetResponse; 038import org.elasticsearch.action.index.IndexRequest; 039import org.elasticsearch.action.index.IndexResponse; 040import org.elasticsearch.action.search.ClearScrollRequest; 041import org.elasticsearch.action.search.ClearScrollResponse; 042import org.elasticsearch.action.search.SearchRequest; 043import org.elasticsearch.action.search.SearchResponse; 044import org.elasticsearch.action.search.SearchScrollRequest; 045import org.elasticsearch.client.Client; 046import org.elasticsearch.client.transport.NoNodeAvailableException; 047import org.elasticsearch.cluster.health.ClusterHealthStatus; 048import org.elasticsearch.cluster.metadata.AliasMetaData; 049import org.elasticsearch.common.collect.ImmutableOpenMap; 050import org.elasticsearch.common.unit.TimeValue; 051import org.elasticsearch.common.xcontent.XContentType; 052import org.elasticsearch.index.engine.VersionConflictEngineException; 053import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 054import org.nuxeo.ecm.core.api.NuxeoException; 055import org.nuxeo.elasticsearch.api.ESClient; 056 057/** 058 * @since 9.3 059 */ 060public class ESTransportClient implements ESClient { 061 private static final Log log = LogFactory.getLog(ESTransportClient.class); 062 063 protected Client client; 064 065 public ESTransportClient(Client client) { 066 this.client = client; 067 } 068 069 @Override 070 public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) { 071 String timeout = timeoutSecond + "s"; 072 log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames)); 073 try { 074 ClusterHealthResponse response = client.admin() 075 .cluster() 076 .prepareHealth(indexNames) 077 .setTimeout(timeout) 078 .setWaitForYellowStatus() 079 .get(); 080 if (response.isTimedOut()) { 081 throw new NuxeoException( 082 "Elasticsearch Cluster health status not Yellow after " + timeout + " give up: " + response); 083 } 084 if (response.getStatus() != ClusterHealthStatus.GREEN) { 085 log.warn("Elasticsearch Cluster ready but not GREEN: " + response); 086 return false; 087 } 088 log.info("Elasticsearch Cluster ready: " + response); 089 } catch (NoNodeAvailableException e) { 090 throw new NuxeoException( 091 "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage()); 092 } 093 return true; 094 } 095 096 @Override 097 public ClusterHealthStatus getHealthStatus(String[] indexNames) { 098 return client.admin().cluster().prepareHealth(indexNames).get().getStatus(); 099 } 100 101 @Override 102 public void refresh(String indexName) { 103 client.admin().indices().prepareRefresh(indexName).get(); 104 } 105 106 @Override 107 public void flush(String indexName) { 108 client.admin().indices().prepareFlush(indexName).get(); 109 } 110 111 @Override 112 public void optimize(String indexName) { 113 client.admin().indices().prepareForceMerge(indexName).get(); 114 } 115 116 @Override 117 public boolean indexExists(String indexName) { 118 return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists(); 119 } 120 121 @Override 122 public boolean mappingExists(String indexName, String type) { 123 GetMappingsResponse mappings = client.admin() 124 .indices() 125 .prepareGetMappings(indexName) 126 .execute() 127 .actionGet(); 128 if (mappings == null || mappings.getMappings().isEmpty()) { 129 return false; 130 } 131 // The real index might have another name if indexName is an alias so we check the mapping of the first item. 132 return mappings.getMappings().values().iterator().next().value.containsKey(type); 133 } 134 135 @Override 136 public void deleteIndex(String indexName, int timeoutSecond) { 137 TimeValue timeout = new TimeValue(timeoutSecond, TimeUnit.SECONDS); 138 client.admin() 139 .indices() 140 .delete(new DeleteIndexRequest(indexName).timeout(timeout).masterNodeTimeout(timeout)) 141 .actionGet(); 142 } 143 144 @Override 145 public void createIndex(String indexName, String jsonSettings) { 146 client.admin().indices().prepareCreate(indexName).setSettings(jsonSettings, XContentType.JSON).get(); 147 } 148 149 @Override 150 public void createMapping(String indexName, String type, String jsonMapping) { 151 client.admin() 152 .indices() 153 .preparePutMapping(indexName) 154 .setType(type) 155 .setSource(jsonMapping, XContentType.JSON) 156 .get(); 157 } 158 159 @Override 160 public String getNodesInfo() { 161 return client.admin().cluster().prepareNodesInfo().get().toString(); 162 } 163 164 @Override 165 public String getNodesStats() { 166 return client.admin().cluster().prepareNodesStats().get().toString(); 167 } 168 169 @Override 170 public boolean aliasExists(String aliasName) { 171 return client.admin().indices().prepareAliasesExist(aliasName).get().isExists(); 172 } 173 174 @Override 175 public String getFirstIndexForAlias(String aliasName) { 176 ImmutableOpenMap<String, List<AliasMetaData>> aliases = client.admin() 177 .indices() 178 .prepareGetAliases(aliasName) 179 .get() 180 .getAliases(); 181 for (Iterator<String> it = aliases.keysIt(); it.hasNext();) { 182 String indexName = it.next(); 183 if (!aliases.get(indexName).isEmpty()) { 184 return indexName; 185 } 186 } 187 return null; 188 } 189 190 @Override 191 public void updateAlias(String aliasName, String indexName) { 192 IndicesAliasesRequestBuilder cmd = client.admin().indices().prepareAliases(); 193 if (aliasExists(aliasName)) { 194 cmd.removeAlias(getFirstIndexForAlias(aliasName), aliasName); 195 } 196 cmd.addAlias(indexName, aliasName).execute().actionGet(); 197 } 198 199 @Override 200 public BulkResponse bulk(BulkRequest request) { 201 return client.bulk(request).actionGet(); 202 } 203 204 @Override 205 public DeleteResponse delete(DeleteRequest request) { 206 return client.delete(request).actionGet(); 207 } 208 209 @Override 210 public SearchResponse search(SearchRequest request) { 211 return client.search(request).actionGet(); 212 } 213 214 @Override 215 public SearchResponse searchScroll(SearchScrollRequest request) { 216 return client.searchScroll(request).actionGet(); 217 } 218 219 @Override 220 public GetResponse get(GetRequest request) { 221 return client.get(request).actionGet(); 222 } 223 224 @Override 225 public IndexResponse index(IndexRequest request) { 226 try { 227 return client.index(request).actionGet(); 228 } catch (VersionConflictEngineException e) { 229 throw new ConcurrentUpdateException(e); 230 } 231 } 232 233 @Override 234 public ClearScrollResponse clearScroll(ClearScrollRequest request) { 235 return client.clearScroll(request).actionGet(); 236 } 237 238 @Override 239 public void close() { 240 if (client != null) { 241 client.close(); 242 client = null; 243 } 244 } 245}