001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import java.util.Arrays; 022import java.util.Iterator; 023import java.util.List; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 029import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; 030import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 031import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; 032import org.elasticsearch.action.bulk.BulkProcessor; 033import org.elasticsearch.action.bulk.BulkRequest; 034import org.elasticsearch.action.bulk.BulkResponse; 035import org.elasticsearch.action.delete.DeleteRequest; 036import org.elasticsearch.action.delete.DeleteResponse; 037import org.elasticsearch.action.get.GetRequest; 038import org.elasticsearch.action.get.GetResponse; 039import org.elasticsearch.action.index.IndexRequest; 040import org.elasticsearch.action.index.IndexResponse; 041import org.elasticsearch.action.search.ClearScrollRequest; 042import org.elasticsearch.action.search.ClearScrollResponse; 043import org.elasticsearch.action.search.SearchRequest; 044import org.elasticsearch.action.search.SearchResponse; 045import org.elasticsearch.action.search.SearchScrollRequest; 046import org.elasticsearch.client.Client; 047import org.elasticsearch.client.transport.NoNodeAvailableException; 048import org.elasticsearch.cluster.health.ClusterHealthStatus; 049import org.elasticsearch.cluster.metadata.AliasMetadata; 050import org.elasticsearch.common.collect.ImmutableOpenMap; 051import org.elasticsearch.common.unit.TimeValue; 052import org.elasticsearch.common.xcontent.XContentType; 053import org.elasticsearch.index.engine.VersionConflictEngineException; 054import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 055import org.nuxeo.ecm.core.api.NuxeoException; 056import org.nuxeo.elasticsearch.api.ESClient; 057 058/** 059 * @since 9.3 060 */ 061public class ESTransportClient implements ESClient { 062 private static final Log log = LogFactory.getLog(ESTransportClient.class); 063 064 protected Client client; 065 066 public ESTransportClient(Client client) { 067 this.client = client; 068 } 069 070 @Override 071 public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) { 072 String timeout = timeoutSecond + "s"; 073 log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames)); 074 try { 075 ClusterHealthResponse response = client.admin() 076 .cluster() 077 .prepareHealth(indexNames) 078 .setTimeout(timeout) 079 .setWaitForYellowStatus() 080 .get(); 081 if (response.isTimedOut()) { 082 throw new NuxeoException( 083 "Elasticsearch Cluster health status not Yellow after " + timeout + " give up: " + response); 084 } 085 if (response.getStatus() != ClusterHealthStatus.GREEN) { 086 log.warn("Elasticsearch Cluster ready but not GREEN: " + response); 087 return false; 088 } 089 log.info("Elasticsearch Cluster ready: " + response); 090 } catch (NoNodeAvailableException e) { 091 throw new NuxeoException( 092 "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage()); 093 } 094 return true; 095 } 096 097 @Override 098 public ClusterHealthStatus getHealthStatus(String[] indexNames) { 099 return client.admin().cluster().prepareHealth(indexNames).get().getStatus(); 100 } 101 102 @Override 103 public void refresh(String indexName) { 104 client.admin().indices().prepareRefresh(indexName).get(); 105 } 106 107 @Override 108 public void flush(String indexName) { 109 client.admin().indices().prepareFlush(indexName).get(); 110 } 111 112 @Override 113 public void optimize(String indexName) { 114 client.admin().indices().prepareForceMerge(indexName).get(); 115 } 116 117 @Override 118 public boolean indexExists(String indexName) { 119 return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists(); 120 } 121 122 @Override 123 public boolean mappingExists(String indexName, String type) { 124 GetMappingsResponse mappings = client.admin().indices().prepareGetMappings(indexName).execute().actionGet(); 125 if (mappings == null || mappings.getMappings().isEmpty()) { 126 return false; 127 } 128 // The real index might have another name if indexName is an alias so we check the mapping of the first item. 129 return mappings.getMappings().values().iterator().next().value.containsKey(type); 130 } 131 132 @Override 133 public void deleteIndex(String indexName, int timeoutSecond) { 134 TimeValue timeout = new TimeValue(timeoutSecond, TimeUnit.SECONDS); 135 client.admin() 136 .indices() 137 .delete(new DeleteIndexRequest(indexName).timeout(timeout).masterNodeTimeout(timeout)) 138 .actionGet(); 139 } 140 141 @Override 142 public void createIndex(String indexName, String jsonSettings) { 143 client.admin().indices().prepareCreate(indexName).setSettings(jsonSettings, XContentType.JSON).get(); 144 } 145 146 @Override 147 public void createMapping(String indexName, String type, String jsonMapping) { 148 client.admin() 149 .indices() 150 .preparePutMapping(indexName) 151 .setSource(jsonMapping, XContentType.JSON) 152 .get(); 153 } 154 155 @Override 156 public String getNodesInfo() { 157 return client.admin().cluster().prepareNodesInfo().get().toString(); 158 } 159 160 @Override 161 public String getNodesStats() { 162 return client.admin().cluster().prepareNodesStats().get().toString(); 163 } 164 165 @Override 166 public boolean aliasExists(String aliasName) { 167 return client.admin().indices().prepareAliasesExist(aliasName).get().isExists(); 168 } 169 170 @Override 171 public String getFirstIndexForAlias(String aliasName) { 172 ImmutableOpenMap<String, List<AliasMetadata>> aliases = client.admin() 173 .indices() 174 .prepareGetAliases(aliasName) 175 .get() 176 .getAliases(); 177 for (Iterator<String> it = aliases.keysIt(); it.hasNext();) { 178 String indexName = it.next(); 179 if (!aliases.get(indexName).isEmpty()) { 180 return indexName; 181 } 182 } 183 return null; 184 } 185 186 @Override 187 public void updateAlias(String aliasName, String indexName) { 188 IndicesAliasesRequestBuilder cmd = client.admin().indices().prepareAliases(); 189 if (aliasExists(aliasName)) { 190 cmd.removeAlias(getFirstIndexForAlias(aliasName), aliasName); 191 } 192 cmd.addAlias(indexName, aliasName).execute().actionGet(); 193 } 194 195 @Override 196 public BulkResponse bulk(BulkRequest request) { 197 return client.bulk(request).actionGet(); 198 } 199 200 @Override 201 public DeleteResponse delete(DeleteRequest request) { 202 return client.delete(request).actionGet(); 203 } 204 205 @Override 206 public SearchResponse search(SearchRequest request) { 207 return client.search(request).actionGet(); 208 } 209 210 @Override 211 public SearchResponse searchScroll(SearchScrollRequest request) { 212 return client.searchScroll(request).actionGet(); 213 } 214 215 @Override 216 public GetResponse get(GetRequest request) { 217 return client.get(request).actionGet(); 218 } 219 220 @Override 221 public IndexResponse index(IndexRequest request) { 222 try { 223 return client.index(request).actionGet(); 224 } catch (VersionConflictEngineException e) { 225 throw new ConcurrentUpdateException(e); 226 } 227 } 228 229 @Override 230 public ClearScrollResponse clearScroll(ClearScrollRequest request) { 231 return client.clearScroll(request).actionGet(); 232 } 233 234 @Override 235 public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener listener) { 236 return BulkProcessor.builder(client, listener); 237 } 238 239 @Override 240 public void close() { 241 if (client != null) { 242 client.close(); 243 client = null; 244 } 245 } 246}