001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS;
024
025import java.net.InetAddress;
026import java.net.UnknownHostException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.LinkedHashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.NoSuchElementException;
036import java.util.Set;
037import java.util.concurrent.atomic.AtomicInteger;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
042import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
043import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
044import org.elasticsearch.client.Client;
045import org.elasticsearch.client.transport.NoNodeAvailableException;
046import org.elasticsearch.client.transport.TransportClient;
047import org.elasticsearch.common.settings.ImmutableSettings;
048import org.elasticsearch.common.settings.ImmutableSettings.Builder;
049import org.elasticsearch.common.settings.Settings;
050import org.elasticsearch.common.transport.InetSocketTransportAddress;
051import org.elasticsearch.node.Node;
052import org.elasticsearch.node.NodeBuilder;
053import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
054import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
055import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
056import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
057import org.nuxeo.runtime.api.Framework;
058
059import com.google.common.util.concurrent.ListenableFuture;
060
061/**
062 * @since 6.0
063 */
064public class ElasticSearchAdminImpl implements ElasticSearchAdmin {
065    private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class);
066
067    private static final String TIMEOUT_WAIT_FOR_CLUSTER = "30s";
068
069    final AtomicInteger totalCommandProcessed = new AtomicInteger(0);
070
071    private final Map<String, String> indexNames = new HashMap<>();
072
073    private final Map<String, String> repoNames = new HashMap<>();
074
075    private final Map<String, ElasticSearchIndexConfig> indexConfig;
076
077    private Node localNode;
078
079    private Client client;
080
081    private boolean indexInitDone = false;
082
083    private final ElasticSearchLocalConfig localConfig;
084
085    private final ElasticSearchRemoteConfig remoteConfig;
086
087    private String[] includeSourceFields;
088
089    private String[] excludeSourceFields;
090
091    private boolean embedded = true;
092
093    private List<String> repositoryInitialized = new ArrayList<>();
094
095    /**
096     * Init the admin service, remote configuration if not null will take precedence over local embedded configuration.
097     */
098    public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig,
099            Map<String, ElasticSearchIndexConfig> indexConfig) {
100        this.remoteConfig = remoteConfig;
101        this.localConfig = localConfig;
102        this.indexConfig = indexConfig;
103        connect();
104        initializeIndexes();
105    }
106
107    private void connect() {
108        if (client != null) {
109            return;
110        }
111        if (remoteConfig != null) {
112            client = connectToRemote(remoteConfig);
113            embedded = false;
114        } else {
115            localNode = createEmbeddedNode(localConfig);
116            client = connectToEmbedded();
117            embedded = true;
118        }
119        checkClusterHealth();
120        log.info("ES Connected");
121    }
122
123    public void disconnect() {
124        if (client != null) {
125            client.close();
126            client = null;
127            indexInitDone = false;
128            log.info("ES Disconnected");
129        }
130        if (localNode != null) {
131            localNode.close();
132            localNode = null;
133            log.info("ES embedded Node Stopped");
134        }
135    }
136
137    private Node createEmbeddedNode(ElasticSearchLocalConfig conf) {
138        log.info("ES embedded Node Initializing (local in JVM)");
139        if (conf == null) {
140            throw new IllegalStateException("No embedded configuration defined");
141        }
142        if (!Framework.isTestModeSet()) {
143            log.warn("Elasticsearch embedded configuration is ONLY for testing"
144                    + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production.");
145        }
146        Builder sBuilder = ImmutableSettings.settingsBuilder();
147        sBuilder.put("http.enabled", conf.httpEnabled()).put("network.host", conf.getNetworkHost()).put("path.data",
148                conf.getDataPath()).put("index.number_of_shards", 1).put("index.number_of_replicas", 0).put(
149                "cluster.name", conf.getClusterName()).put("node.name", conf.getNodeName()).put(
150                "http.netty.worker_count", 4).put("http.cors.enabled", true).put(
151                "cluster.routing.allocation.disk.threshold_enabled", false);
152        if (conf.getIndexStorageType() != null) {
153            sBuilder.put("index.store.type", conf.getIndexStorageType());
154            if (conf.getIndexStorageType().equals("memory")) {
155                sBuilder.put("gateway.type", "none");
156            }
157        }
158        Settings settings = sBuilder.build();
159        log.debug("Using settings: " + settings.toDelimitedString(','));
160        Node ret = NodeBuilder.nodeBuilder().local(true).settings(settings).node();
161        assert ret != null : "Can not create an embedded ES Node";
162        return ret;
163    }
164
165    private Client connectToEmbedded() {
166        log.info("Connecting to embedded ES");
167        Client ret = localNode.start().client();
168        assert ret != null : "Can not connect to embedded ES Node";
169        return ret;
170    }
171
172    private Client connectToRemote(ElasticSearchRemoteConfig config) {
173        log.info("Connecting to remote ES cluster: " + config);
174        Builder builder = ImmutableSettings.settingsBuilder().put("cluster.name", config.getClusterName()).put(
175                "client.transport.nodes_sampler_interval", config.getSamplerInterval()).put(
176                "client.transport.ping_timeout", config.getPingTimeout()).put("client.transport.ignore_cluster_name",
177                config.isIgnoreClusterName()).put("client.transport.sniff", config.isClusterSniff());
178        Settings settings = builder.build();
179        if (log.isDebugEnabled()) {
180            log.debug("Using settings: " + settings.toDelimitedString(','));
181        }
182        TransportClient ret = new TransportClient(settings);
183        String[] addresses = config.getAddresses();
184        if (addresses == null) {
185            log.error("You need to provide an addressList to join a cluster");
186        } else {
187            for (String item : config.getAddresses()) {
188                String[] address = item.split(":");
189                log.debug("Add transport address: " + item);
190                try {
191                    InetAddress inet = InetAddress.getByName(address[0]);
192                    ret.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1])));
193                } catch (UnknownHostException e) {
194                    log.error("Unable to resolve host " + address[0], e);
195                }
196            }
197        }
198        assert ret != null : "Unable to create a remote client";
199        return ret;
200    }
201
202    private void checkClusterHealth(String... indexNames) {
203        if (client == null) {
204            throw new IllegalStateException("No es client available");
205        }
206        String errorMessage = null;
207        try {
208            log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames));
209            ClusterHealthResponse ret = client.admin().cluster().prepareHealth(indexNames).setTimeout(
210                    TIMEOUT_WAIT_FOR_CLUSTER).setWaitForYellowStatus().get();
211            if (ret.isTimedOut()) {
212                errorMessage = "ES Cluster health status not Yellow after " + TIMEOUT_WAIT_FOR_CLUSTER + " give up: "
213                        + ret;
214            } else {
215                if ((indexNames.length > 0) && ret.getStatus() != ClusterHealthStatus.GREEN) {
216                    log.warn("Es Cluster ready but not GREEN: " + ret);
217                } else {
218                    log.info("ES Cluster ready: " + ret);
219                }
220            }
221        } catch (NoNodeAvailableException e) {
222            errorMessage = "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage();
223        }
224        if (errorMessage != null) {
225            log.error(errorMessage);
226            throw new RuntimeException(errorMessage);
227        }
228    }
229
230    private void initializeIndexes() {
231        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
232            if (conf.isDocumentIndex()) {
233                log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName());
234                indexNames.put(conf.getRepositoryName(), conf.getName());
235                repoNames.put(conf.getName(), conf.getRepositoryName());
236                Set<String> set = new LinkedHashSet<>();
237                if (includeSourceFields != null) {
238                    set.addAll(Arrays.asList(includeSourceFields));
239                }
240                set.addAll(Arrays.asList(conf.getIncludes()));
241                if (set.contains(ALL_FIELDS)) {
242                    set.clear();
243                    set.add(ALL_FIELDS);
244                }
245                includeSourceFields = set.toArray(new String[set.size()]);
246                set.clear();
247                if (excludeSourceFields != null) {
248                    set.addAll(Arrays.asList(excludeSourceFields));
249                }
250                set.addAll(Arrays.asList(conf.getExcludes()));
251                excludeSourceFields = set.toArray(new String[set.size()]);
252            }
253
254        }
255        initIndexes(false);
256    }
257
258    // Admin Impl =============================================================
259    @Override
260    public void refreshRepositoryIndex(String repositoryName) {
261        if (log.isDebugEnabled()) {
262            log.debug("Refreshing index associated with repo: " + repositoryName);
263        }
264        getClient().admin().indices().prepareRefresh(getIndexNameForRepository(repositoryName)).execute().actionGet();
265        if (log.isDebugEnabled()) {
266            log.debug("Refreshing index done");
267        }
268    }
269
270    @Override
271    public String getIndexNameForRepository(String repositoryName) {
272        String ret = indexNames.get(repositoryName);
273        if (ret == null) {
274            throw new NoSuchElementException("No index defined for repository: " + repositoryName);
275        }
276        return ret;
277    }
278
279    @Override
280    public List<String> getIndexNamesForType(String type) {
281        List<String> indexNames = new ArrayList<>();
282        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
283            if (type.equals(conf.getType())) {
284                indexNames.add(conf.getName());
285            }
286        }
287        return indexNames;
288    }
289
290    @Override
291    public String getIndexNameForType(String type) {
292        List<String> indexNames = getIndexNamesForType(type);
293        if (indexNames.isEmpty()) {
294            throw new NoSuchElementException("No index defined for type: " + type);
295        }
296        return indexNames.get(0);
297    }
298
299    @Override
300    public void flushRepositoryIndex(String repositoryName) {
301        log.warn("Flushing index associated with repo: " + repositoryName);
302        getClient().admin().indices().prepareFlush(getIndexNameForRepository(repositoryName)).execute().actionGet();
303        log.info("Flushing index done");
304    }
305
306    @Override
307    public void refresh() {
308        for (String repositoryName : indexNames.keySet()) {
309            refreshRepositoryIndex(repositoryName);
310        }
311    }
312
313    @Override
314    public void flush() {
315        for (String repositoryName : indexNames.keySet()) {
316            flushRepositoryIndex(repositoryName);
317        }
318    }
319
320    @Override
321    public void optimizeIndex(String indexName) {
322        log.warn("Optimizing index: " + indexName);
323        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
324            if (conf.getName().equals(indexName)) {
325                getClient().admin().indices().prepareOptimize(indexName).get();
326            }
327        }
328        log.info("Optimize done");
329    }
330
331    @Override
332    public void optimizeRepositoryIndex(String repositoryName) {
333        optimizeIndex(getIndexNameForRepository(repositoryName));
334    }
335
336    @Override
337    public void optimize() {
338        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
339            optimizeIndex(conf.getName());
340        }
341    }
342
343    @Override
344    public Client getClient() {
345        return client;
346    }
347
348    @Override
349    public void initIndexes(boolean dropIfExists) {
350        indexInitDone = false;
351        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
352            initIndex(conf, dropIfExists);
353        }
354        log.info("ES Service ready");
355        indexInitDone = true;
356    }
357
358    @Override
359    public void dropAndInitIndex(String indexName) {
360        log.info("Drop and init index: " + indexName);
361        indexInitDone = false;
362        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
363            if (conf.getName().equals(indexName)) {
364                initIndex(conf, true);
365            }
366        }
367        indexInitDone = true;
368    }
369
370    @Override
371    public void dropAndInitRepositoryIndex(String repositoryName) {
372        log.info("Drop and init index of repository: " + repositoryName);
373        indexInitDone = false;
374        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
375            if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) {
376                initIndex(conf, true);
377            }
378        }
379        indexInitDone = true;
380    }
381
382    @Override
383    public List<String> getRepositoryNames() {
384        return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet()));
385    }
386
387    void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) {
388        if (!conf.mustCreate()) {
389            return;
390        }
391        log.info(String.format("Initialize index: %s, type: %s", conf.getName(), conf.getType()));
392        boolean mappingExists = false;
393        boolean indexExists = getClient().admin().indices().prepareExists(conf.getName()).execute().actionGet().isExists();
394        if (indexExists) {
395            if (!dropIfExists) {
396                log.debug("Index " + conf.getName() + " already exists");
397                mappingExists = getClient().admin().indices().prepareGetMappings(conf.getName()).execute().actionGet().getMappings().get(
398                        conf.getName()).containsKey(conf.getType());
399            } else {
400                if (!Framework.isTestModeSet()) {
401                    log.warn(String.format("Initializing index: %s, type: %s with "
402                            + "dropIfExists flag, deleting an existing index", conf.getName(), conf.getType()));
403                }
404                getClient().admin().indices().delete(new DeleteIndexRequest(conf.getName())).actionGet();
405                indexExists = false;
406            }
407        }
408        if (!indexExists) {
409            log.info(String.format("Creating index: %s", conf.getName()));
410            if (log.isDebugEnabled()) {
411                log.debug("Using settings: " + conf.getSettings());
412            }
413            getClient().admin().indices().prepareCreate(conf.getName()).setSettings(conf.getSettings()).execute().actionGet();
414        }
415        if (!mappingExists) {
416            log.info(String.format("Creating mapping type: %s on index: %s", conf.getType(), conf.getName()));
417            if (log.isDebugEnabled()) {
418                log.debug("Using mapping: " + conf.getMapping());
419            }
420            getClient().admin().indices().preparePutMapping(conf.getName()).setType(conf.getType()).setSource(
421                    conf.getMapping()).execute().actionGet();
422            if (!dropIfExists && conf.getRepositoryName() != null) {
423                repositoryInitialized.add(conf.getRepositoryName());
424            }
425        }
426        // make sure the index is ready before returning
427        checkClusterHealth(conf.getName());
428    }
429
430    @Override
431    public int getPendingWorkerCount() {
432        // impl of scheduling is left to the ESService
433        throw new UnsupportedOperationException("Not implemented");
434    }
435
436    @Override
437    public int getRunningWorkerCount() {
438        // impl of scheduling is left to the ESService
439        throw new UnsupportedOperationException("Not implemented");
440    }
441
442    @Override
443    public int getTotalCommandProcessed() {
444        return totalCommandProcessed.get();
445    }
446
447    @Override
448    public boolean isEmbedded() {
449        return embedded;
450    }
451
452    @Override
453    public boolean isIndexingInProgress() {
454        // impl of scheduling is left to the ESService
455        throw new UnsupportedOperationException("Not implemented");
456    }
457
458    @Override
459    public ListenableFuture<Boolean> prepareWaitForIndexing() {
460        throw new UnsupportedOperationException("Not implemented");
461    }
462
463    /**
464     * Get the elastic search indexes for searches
465     */
466    String[] getSearchIndexes(List<String> searchRepositories) {
467        if (searchRepositories.isEmpty()) {
468            Collection<String> values = indexNames.values();
469            return values.toArray(new String[values.size()]);
470        }
471        String[] ret = new String[searchRepositories.size()];
472        int i = 0;
473        for (String repo : searchRepositories) {
474            ret[i++] = getIndexNameForRepository(repo);
475        }
476        return ret;
477    }
478
479    public boolean isReady() {
480        return indexInitDone;
481    }
482
483    String[] getIncludeSourceFields() {
484        return includeSourceFields;
485    }
486
487    String[] getExcludeSourceFields() {
488        return excludeSourceFields;
489    }
490
491    Map<String, String> getRepositoryMap() {
492        return repoNames;
493    }
494
495    /**
496     * Get the list of repository names that have their index created.
497     */
498    public List<String> getInitializedRepositories() {
499        return repositoryInitialized;
500    }
501}