001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.client; 020 021import java.util.Arrays; 022import java.util.Iterator; 023import java.util.List; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 029import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; 030import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 031import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; 032import org.elasticsearch.action.bulk.BulkProcessor; 033import org.elasticsearch.action.bulk.BulkRequest; 034import org.elasticsearch.action.bulk.BulkResponse; 035import org.elasticsearch.action.delete.DeleteRequest; 036import org.elasticsearch.action.delete.DeleteResponse; 037import org.elasticsearch.action.get.GetRequest; 038import org.elasticsearch.action.get.GetResponse; 039import org.elasticsearch.action.index.IndexRequest; 040import org.elasticsearch.action.index.IndexResponse; 041import org.elasticsearch.action.search.ClearScrollRequest; 042import org.elasticsearch.action.search.ClearScrollResponse; 043import org.elasticsearch.action.search.SearchRequest; 044import org.elasticsearch.action.search.SearchResponse; 045import org.elasticsearch.action.search.SearchScrollRequest; 046import org.elasticsearch.client.Client; 047import org.elasticsearch.client.transport.NoNodeAvailableException; 048import org.elasticsearch.cluster.health.ClusterHealthStatus; 049import org.elasticsearch.cluster.metadata.AliasMetaData; 050import org.elasticsearch.common.collect.ImmutableOpenMap; 051import org.elasticsearch.common.unit.TimeValue; 052import org.elasticsearch.common.xcontent.XContentType; 053import org.elasticsearch.index.engine.VersionConflictEngineException; 054import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 055import org.nuxeo.ecm.core.api.NuxeoException; 056import org.nuxeo.elasticsearch.api.ESClient; 057 058/** 059 * @since 9.3 060 */ 061public class ESTransportClient implements ESClient { 062 private static final Log log = LogFactory.getLog(ESTransportClient.class); 063 064 protected Client client; 065 066 public ESTransportClient(Client client) { 067 this.client = client; 068 } 069 070 @Override 071 public boolean waitForYellowStatus(String[] indexNames, int timeoutSecond) { 072 String timeout = timeoutSecond + "s"; 073 log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames)); 074 try { 075 ClusterHealthResponse response = client.admin() 076 .cluster() 077 .prepareHealth(indexNames) 078 .setTimeout(timeout) 079 .setWaitForYellowStatus() 080 .get(); 081 if (response.isTimedOut()) { 082 throw new NuxeoException( 083 "Elasticsearch Cluster health status not Yellow after " + timeout + " give up: " + response); 084 } 085 if (response.getStatus() != ClusterHealthStatus.GREEN) { 086 log.warn("Elasticsearch Cluster ready but not GREEN: " + response); 087 return false; 088 } 089 log.info("Elasticsearch Cluster ready: " + response); 090 } catch (NoNodeAvailableException e) { 091 throw new NuxeoException( 092 "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage()); 093 } 094 return true; 095 } 096 097 @Override 098 public ClusterHealthStatus getHealthStatus(String[] indexNames) { 099 return client.admin().cluster().prepareHealth(indexNames).get().getStatus(); 100 } 101 102 @Override 103 public void refresh(String indexName) { 104 client.admin().indices().prepareRefresh(indexName).get(); 105 } 106 107 @Override 108 public void flush(String indexName) { 109 client.admin().indices().prepareFlush(indexName).get(); 110 } 111 112 @Override 113 public void optimize(String indexName) { 114 client.admin().indices().prepareForceMerge(indexName).get(); 115 } 116 117 @Override 118 public boolean indexExists(String indexName) { 119 return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists(); 120 } 121 122 @Override 123 public boolean mappingExists(String indexName, String type) { 124 GetMappingsResponse mappings = client.admin().indices().prepareGetMappings(indexName).execute().actionGet(); 125 if (mappings == null || mappings.getMappings().isEmpty()) { 126 return false; 127 } 128 // The real index might have another name if indexName is an alias so we check the mapping of the first item. 129 return mappings.getMappings().values().iterator().next().value.containsKey(type); 130 } 131 132 @Override 133 public void deleteIndex(String indexName, int timeoutSecond) { 134 TimeValue timeout = new TimeValue(timeoutSecond, TimeUnit.SECONDS); 135 client.admin() 136 .indices() 137 .delete(new DeleteIndexRequest(indexName).timeout(timeout).masterNodeTimeout(timeout)) 138 .actionGet(); 139 } 140 141 @Override 142 public void createIndex(String indexName, String jsonSettings) { 143 client.admin().indices().prepareCreate(indexName).setSettings(jsonSettings, XContentType.JSON).get(); 144 } 145 146 @Override 147 public void createMapping(String indexName, String type, String jsonMapping) { 148 client.admin() 149 .indices() 150 .preparePutMapping(indexName) 151 .setType(type) 152 .setSource(jsonMapping, XContentType.JSON) 153 .get(); 154 } 155 156 @Override 157 public String getNodesInfo() { 158 return client.admin().cluster().prepareNodesInfo().get().toString(); 159 } 160 161 @Override 162 public String getNodesStats() { 163 return client.admin().cluster().prepareNodesStats().get().toString(); 164 } 165 166 @Override 167 public boolean aliasExists(String aliasName) { 168 return client.admin().indices().prepareAliasesExist(aliasName).get().isExists(); 169 } 170 171 @Override 172 public String getFirstIndexForAlias(String aliasName) { 173 ImmutableOpenMap<String, List<AliasMetaData>> aliases = client.admin() 174 .indices() 175 .prepareGetAliases(aliasName) 176 .get() 177 .getAliases(); 178 for (Iterator<String> it = aliases.keysIt(); it.hasNext();) { 179 String indexName = it.next(); 180 if (!aliases.get(indexName).isEmpty()) { 181 return indexName; 182 } 183 } 184 return null; 185 } 186 187 @Override 188 public void updateAlias(String aliasName, String indexName) { 189 IndicesAliasesRequestBuilder cmd = client.admin().indices().prepareAliases(); 190 if (aliasExists(aliasName)) { 191 cmd.removeAlias(getFirstIndexForAlias(aliasName), aliasName); 192 } 193 cmd.addAlias(indexName, aliasName).execute().actionGet(); 194 } 195 196 @Override 197 public BulkResponse bulk(BulkRequest request) { 198 return client.bulk(request).actionGet(); 199 } 200 201 @Override 202 public DeleteResponse delete(DeleteRequest request) { 203 return client.delete(request).actionGet(); 204 } 205 206 @Override 207 public SearchResponse search(SearchRequest request) { 208 return client.search(request).actionGet(); 209 } 210 211 @Override 212 public SearchResponse searchScroll(SearchScrollRequest request) { 213 return client.searchScroll(request).actionGet(); 214 } 215 216 @Override 217 public GetResponse get(GetRequest request) { 218 return client.get(request).actionGet(); 219 } 220 221 @Override 222 public IndexResponse index(IndexRequest request) { 223 try { 224 return client.index(request).actionGet(); 225 } catch (VersionConflictEngineException e) { 226 throw new ConcurrentUpdateException(e); 227 } 228 } 229 230 @Override 231 public ClearScrollResponse clearScroll(ClearScrollRequest request) { 232 return client.clearScroll(request).actionGet(); 233 } 234 235 @Override 236 public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener listener) { 237 return BulkProcessor.builder(client, listener); 238 } 239 240 @Override 241 public void close() { 242 if (client != null) { 243 client.close(); 244 client = null; 245 } 246 } 247}