001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import java.util.Arrays; 022import java.util.Iterator; 023import java.util.List; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 029import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; 030import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 031import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; 032import org.elasticsearch.action.bulk.BulkProcessor; 033import org.elasticsearch.action.bulk.BulkRequest; 034import org.elasticsearch.action.bulk.BulkResponse; 035import org.elasticsearch.action.delete.DeleteRequest; 036import org.elasticsearch.action.delete.DeleteResponse; 037import org.elasticsearch.action.get.GetRequest; 038import org.elasticsearch.action.get.GetResponse; 039import org.elasticsearch.action.index.IndexRequest; 040import org.elasticsearch.action.index.IndexResponse; 041import org.elasticsearch.action.search.ClearScrollRequest; 042import org.elasticsearch.action.search.ClearScrollResponse; 043import org.elasticsearch.action.search.SearchRequest; 044import org.elasticsearch.action.search.SearchResponse; 045import org.elasticsearch.action.search.SearchScrollRequest; 046import org.elasticsearch.client.Client; 047import org.elasticsearch.client.transport.NoNodeAvailableException; 048import org.elasticsearch.cluster.health.ClusterHealthStatus; 049import org.elasticsearch.cluster.metadata.AliasMetaData; 050import org.elasticsearch.common.collect.ImmutableOpenMap; 051import org.elasticsearch.common.unit.TimeValue; 052import org.elasticsearch.common.xcontent.XContentType; 053import org.elasticsearch.index.engine.VersionConflictEngineException; 054import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 055import org.nuxeo.ecm.core.api.NuxeoException; 056import org.nuxeo.elasticsearch.api.ESClient; 057 058/** 059 * @since 9.3 060 */ 061public class ESTransportClient implements ESClient { 062 private static final Log log = LogFactory.getLog(ESTransportClient.class); 063 064 protected Client client; 065 066 public ESTransportClient(Client client) { 067 this.client = client; 068 } 069 070 @Override 071 public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) { 072 String timeout = timeoutSecond + "s"; 073 log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames)); 074 try { 075 ClusterHealthResponse response = client.admin() 076 .cluster() 077 .prepareHealth(indexNames) 078 .setTimeout(timeout) 079 .setWaitForYellowStatus() 080 .get(); 081 if (response.isTimedOut()) { 082 throw new NuxeoException( 083 "Elasticsearch Cluster health status not Yellow after " + timeout + " give up: " + response); 084 } 085 if (response.getStatus() != ClusterHealthStatus.GREEN) { 086 log.warn("Elasticsearch Cluster ready but not GREEN: " + response); 087 return false; 088 } 089 log.info("Elasticsearch Cluster ready: " + response); 090 } catch (NoNodeAvailableException e) { 091 throw new NuxeoException( 092 "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage()); 093 } 094 return true; 095 } 096 097 @Override 098 public ClusterHealthStatus getHealthStatus(String[] indexNames) { 099 return client.admin().cluster().prepareHealth(indexNames).get().getStatus(); 100 } 101 102 @Override 103 public void refresh(String indexName) { 104 client.admin().indices().prepareRefresh(indexName).get(); 105 } 106 107 @Override 108 public void flush(String indexName) { 109 client.admin().indices().prepareFlush(indexName).get(); 110 } 111 112 @Override 113 public void optimize(String indexName) { 114 client.admin().indices().prepareForceMerge(indexName).get(); 115 } 116 117 @Override 118 public boolean indexExists(String indexName) { 119 return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists(); 120 } 121 122 @Override 123 public boolean mappingExists(String indexName, String type) { 124 GetMappingsResponse mappings = client.admin() 125 .indices() 126 .prepareGetMappings(indexName) 127 .execute() 128 .actionGet(); 129 if (mappings == null || mappings.getMappings().isEmpty()) { 130 return false; 131 } 132 // The real index might have another name if indexName is an alias so we check the mapping of the first item. 133 return mappings.getMappings().values().iterator().next().value.containsKey(type); 134 } 135 136 @Override 137 public void deleteIndex(String indexName, int timeoutSecond) { 138 TimeValue timeout = new TimeValue(timeoutSecond, TimeUnit.SECONDS); 139 client.admin() 140 .indices() 141 .delete(new DeleteIndexRequest(indexName).timeout(timeout).masterNodeTimeout(timeout)) 142 .actionGet(); 143 } 144 145 @Override 146 public void createIndex(String indexName, String jsonSettings) { 147 client.admin().indices().prepareCreate(indexName).setSettings(jsonSettings, XContentType.JSON).get(); 148 } 149 150 @Override 151 public void createMapping(String indexName, String type, String jsonMapping) { 152 client.admin() 153 .indices() 154 .preparePutMapping(indexName) 155 .setType(type) 156 .setSource(jsonMapping, XContentType.JSON) 157 .get(); 158 } 159 160 @Override 161 public String getNodesInfo() { 162 return client.admin().cluster().prepareNodesInfo().get().toString(); 163 } 164 165 @Override 166 public String getNodesStats() { 167 return client.admin().cluster().prepareNodesStats().get().toString(); 168 } 169 170 @Override 171 public boolean aliasExists(String aliasName) { 172 return client.admin().indices().prepareAliasesExist(aliasName).get().isExists(); 173 } 174 175 @Override 176 public String getFirstIndexForAlias(String aliasName) { 177 ImmutableOpenMap<String, List<AliasMetaData>> aliases = client.admin() 178 .indices() 179 .prepareGetAliases(aliasName) 180 .get() 181 .getAliases(); 182 for (Iterator<String> it = aliases.keysIt(); it.hasNext();) { 183 String indexName = it.next(); 184 if (!aliases.get(indexName).isEmpty()) { 185 return indexName; 186 } 187 } 188 return null; 189 } 190 191 @Override 192 public void updateAlias(String aliasName, String indexName) { 193 IndicesAliasesRequestBuilder cmd = client.admin().indices().prepareAliases(); 194 if (aliasExists(aliasName)) { 195 cmd.removeAlias(getFirstIndexForAlias(aliasName), aliasName); 196 } 197 cmd.addAlias(indexName, aliasName).execute().actionGet(); 198 } 199 200 @Override 201 public BulkResponse bulk(BulkRequest request) { 202 return client.bulk(request).actionGet(); 203 } 204 205 @Override 206 public DeleteResponse delete(DeleteRequest request) { 207 return client.delete(request).actionGet(); 208 } 209 210 @Override 211 public SearchResponse search(SearchRequest request) { 212 return client.search(request).actionGet(); 213 } 214 215 @Override 216 public SearchResponse searchScroll(SearchScrollRequest request) { 217 return client.searchScroll(request).actionGet(); 218 } 219 220 @Override 221 public GetResponse get(GetRequest request) { 222 return client.get(request).actionGet(); 223 } 224 225 @Override 226 public IndexResponse index(IndexRequest request) { 227 try { 228 return client.index(request).actionGet(); 229 } catch (VersionConflictEngineException e) { 230 throw new ConcurrentUpdateException(e); 231 } 232 } 233 234 @Override 235 public ClearScrollResponse clearScroll(ClearScrollRequest request) { 236 return client.clearScroll(request).actionGet(); 237 } 238 239 @Override 240 public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener listener) { 241 return BulkProcessor.builder(client, listener); 242 } 243 244 @Override 245 public void close() { 246 if (client != null) { 247 client.close(); 248 client = null; 249 } 250 } 251}