001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS;
024
025import java.net.InetAddress;
026import java.net.UnknownHostException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.LinkedHashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.NoSuchElementException;
036import java.util.Set;
037import java.util.concurrent.atomic.AtomicInteger;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
042import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
043import org.elasticsearch.client.Client;
044import org.elasticsearch.client.transport.NoNodeAvailableException;
045import org.elasticsearch.client.transport.TransportClient;
046import org.elasticsearch.cluster.health.ClusterHealthStatus;
047import org.elasticsearch.common.settings.Settings;
048import org.elasticsearch.common.settings.Settings.Builder;
049import org.elasticsearch.common.transport.InetSocketTransportAddress;
050import org.elasticsearch.node.Node;
051import org.elasticsearch.node.NodeBuilder;
052import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
053import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
054import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
055import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
056import org.nuxeo.runtime.api.Framework;
057
058import com.google.common.util.concurrent.ListenableFuture;
059
060/**
061 * @since 6.0
062 */
063public class ElasticSearchAdminImpl implements ElasticSearchAdmin {
064    private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class);
065
066    private static final String TIMEOUT_WAIT_FOR_CLUSTER = "30s";
067
068    final AtomicInteger totalCommandProcessed = new AtomicInteger(0);
069
070    private final Map<String, String> indexNames = new HashMap<>();
071
072    private final Map<String, String> repoNames = new HashMap<>();
073
074    private final Map<String, ElasticSearchIndexConfig> indexConfig;
075
076    private Node localNode;
077
078    private Client client;
079
080    private boolean indexInitDone = false;
081
082    private final ElasticSearchLocalConfig localConfig;
083
084    private final ElasticSearchRemoteConfig remoteConfig;
085
086    private String[] includeSourceFields;
087
088    private String[] excludeSourceFields;
089
090    private boolean embedded = true;
091
092    private List<String> repositoryInitialized = new ArrayList<>();
093
094    /**
095     * Init the admin service, remote configuration if not null will take precedence over local embedded configuration.
096     */
097    public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig,
098            Map<String, ElasticSearchIndexConfig> indexConfig) {
099        this.remoteConfig = remoteConfig;
100        this.localConfig = localConfig;
101        this.indexConfig = indexConfig;
102        connect();
103        initializeIndexes();
104    }
105
106    private void connect() {
107        if (client != null) {
108            return;
109        }
110        if (remoteConfig != null) {
111            client = connectToRemote(remoteConfig);
112            embedded = false;
113        } else {
114            localNode = createEmbeddedNode(localConfig);
115            client = connectToEmbedded();
116            embedded = true;
117        }
118        checkClusterHealth();
119        log.info("ES Connected");
120    }
121
122    public void disconnect() {
123        if (client != null) {
124            client.close();
125            client = null;
126            indexInitDone = false;
127            log.info("ES Disconnected");
128        }
129        if (localNode != null) {
130            localNode.close();
131            localNode = null;
132            log.info("ES embedded Node Stopped");
133        }
134    }
135
136    private Node createEmbeddedNode(ElasticSearchLocalConfig conf) {
137        log.info("ES embedded Node Initializing (local in JVM)");
138        if (conf == null) {
139            throw new IllegalStateException("No embedded configuration defined");
140        }
141        if (!Framework.isTestModeSet()) {
142            log.warn("Elasticsearch embedded configuration is ONLY for testing"
143                    + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production.");
144        }
145        Builder sBuilder = Settings.settingsBuilder();
146        sBuilder.put("http.enabled", conf.httpEnabled())
147                .put("network.host", conf.getNetworkHost())
148                .put("path.home", conf.getHomePath())
149                .put("path.data", conf.getDataPath())
150                .put("index.number_of_shards", 1)
151                .put("index.number_of_replicas", 0)
152                .put("cluster.name", conf.getClusterName())
153                .put("node.name", conf.getNodeName())
154                .put("http.netty.worker_count", 4)
155                .put("http.cors.enabled", true)
156                .put("cluster.routing.allocation.disk.threshold_enabled", false)
157                .put("http.port", conf.getHttpPort());
158        if (conf.getIndexStorageType() != null) {
159            sBuilder.put("index.store.type", conf.getIndexStorageType());
160        }
161        Settings settings = sBuilder.build();
162        log.debug("Using settings: " + settings.toDelimitedString(','));
163        Node ret = NodeBuilder.nodeBuilder().local(true).settings(settings).node();
164        assert ret != null : "Can not create an embedded ES Node";
165        return ret;
166    }
167
168    private Client connectToEmbedded() {
169        log.info("Connecting to embedded ES");
170        Client ret = localNode.start().client();
171        assert ret != null : "Can not connect to embedded ES Node";
172        return ret;
173    }
174
175    private Client connectToRemote(ElasticSearchRemoteConfig config) {
176        log.info("Connecting to remote ES cluster: " + config);
177        Builder builder = Settings.settingsBuilder()
178                                  .put("cluster.name", config.getClusterName())
179                                  .put("client.transport.nodes_sampler_interval", config.getSamplerInterval())
180                                  .put("client.transport.ping_timeout", config.getPingTimeout())
181                                  .put("client.transport.ignore_cluster_name", config.isIgnoreClusterName())
182                                  .put("client.transport.sniff", config.isClusterSniff());
183        Settings settings = builder.build();
184        if (log.isDebugEnabled()) {
185            log.debug("Using settings: " + settings.toDelimitedString(','));
186        }
187        TransportClient ret = TransportClient.builder().settings(settings).build();
188        String[] addresses = config.getAddresses();
189        if (addresses == null) {
190            log.error("You need to provide an addressList to join a cluster");
191        } else {
192            for (String item : config.getAddresses()) {
193                String[] address = item.split(":");
194                log.debug("Add transport address: " + item);
195                try {
196                    InetAddress inet = InetAddress.getByName(address[0]);
197                    ret.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1])));
198                } catch (UnknownHostException e) {
199                    log.error("Unable to resolve host " + address[0], e);
200                }
201            }
202        }
203        assert ret != null : "Unable to create a remote client";
204        return ret;
205    }
206
207    private void checkClusterHealth(String... indexNames) {
208        if (client == null) {
209            throw new IllegalStateException("No es client available");
210        }
211        String errorMessage = null;
212        try {
213            log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames));
214            ClusterHealthResponse ret = client.admin()
215                                              .cluster()
216                                              .prepareHealth(indexNames)
217                                              .setTimeout(TIMEOUT_WAIT_FOR_CLUSTER)
218                                              .setWaitForYellowStatus()
219                                              .get();
220            if (ret.isTimedOut()) {
221                errorMessage = "ES Cluster health status not Yellow after " + TIMEOUT_WAIT_FOR_CLUSTER + " give up: "
222                        + ret;
223            } else {
224                if ((indexNames.length > 0) && ret.getStatus() != ClusterHealthStatus.GREEN) {
225                    log.warn("Es Cluster ready but not GREEN: " + ret);
226                } else {
227                    log.info("ES Cluster ready: " + ret);
228                }
229            }
230        } catch (NoNodeAvailableException e) {
231            errorMessage = "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage();
232        }
233        if (errorMessage != null) {
234            log.error(errorMessage);
235            throw new RuntimeException(errorMessage);
236        }
237    }
238
239    private void initializeIndexes() {
240        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
241            if (conf.isDocumentIndex()) {
242                log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName());
243                indexNames.put(conf.getRepositoryName(), conf.getName());
244                repoNames.put(conf.getName(), conf.getRepositoryName());
245                Set<String> set = new LinkedHashSet<>();
246                if (includeSourceFields != null) {
247                    set.addAll(Arrays.asList(includeSourceFields));
248                }
249                set.addAll(Arrays.asList(conf.getIncludes()));
250                if (set.contains(ALL_FIELDS)) {
251                    set.clear();
252                    set.add(ALL_FIELDS);
253                }
254                includeSourceFields = set.toArray(new String[set.size()]);
255                set.clear();
256                if (excludeSourceFields != null) {
257                    set.addAll(Arrays.asList(excludeSourceFields));
258                }
259                set.addAll(Arrays.asList(conf.getExcludes()));
260                excludeSourceFields = set.toArray(new String[set.size()]);
261            }
262
263        }
264        initIndexes(false);
265    }
266
267    // Admin Impl =============================================================
268    @Override
269    public void refreshRepositoryIndex(String repositoryName) {
270        if (log.isDebugEnabled()) {
271            log.debug("Refreshing index associated with repo: " + repositoryName);
272        }
273        getClient().admin().indices().prepareRefresh(getIndexNameForRepository(repositoryName)).execute().actionGet();
274        if (log.isDebugEnabled()) {
275            log.debug("Refreshing index done");
276        }
277    }
278
279    @Override
280    public String getIndexNameForRepository(String repositoryName) {
281        String ret = indexNames.get(repositoryName);
282        if (ret == null) {
283            throw new NoSuchElementException("No index defined for repository: " + repositoryName);
284        }
285        return ret;
286    }
287
288    @Override
289    public List<String> getIndexNamesForType(String type) {
290        List<String> indexNames = new ArrayList<>();
291        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
292            if (type.equals(conf.getType())) {
293                indexNames.add(conf.getName());
294            }
295        }
296        return indexNames;
297    }
298
299    @Override
300    public String getIndexNameForType(String type) {
301        List<String> indexNames = getIndexNamesForType(type);
302        if (indexNames.isEmpty()) {
303            throw new NoSuchElementException("No index defined for type: " + type);
304        }
305        return indexNames.get(0);
306    }
307
308    @Override
309    public void flushRepositoryIndex(String repositoryName) {
310        log.warn("Flushing index associated with repo: " + repositoryName);
311        getClient().admin().indices().prepareFlush(getIndexNameForRepository(repositoryName)).execute().actionGet();
312        log.info("Flushing index done");
313    }
314
315    @Override
316    public void refresh() {
317        for (String repositoryName : indexNames.keySet()) {
318            refreshRepositoryIndex(repositoryName);
319        }
320    }
321
322    @Override
323    public void flush() {
324        for (String repositoryName : indexNames.keySet()) {
325            flushRepositoryIndex(repositoryName);
326        }
327    }
328
329    @Override
330    public void optimizeIndex(String indexName) {
331        log.warn("Optimizing index: " + indexName);
332        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
333            if (conf.getName().equals(indexName)) {
334                getClient().admin().indices().prepareForceMerge(indexName).get();
335            }
336        }
337        log.info("Optimize done");
338    }
339
340    @Override
341    public void optimizeRepositoryIndex(String repositoryName) {
342        optimizeIndex(getIndexNameForRepository(repositoryName));
343    }
344
345    @Override
346    public void optimize() {
347        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
348            optimizeIndex(conf.getName());
349        }
350    }
351
352    @Override
353    public Client getClient() {
354        return client;
355    }
356
357    @Override
358    public void initIndexes(boolean dropIfExists) {
359        indexInitDone = false;
360        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
361            initIndex(conf, dropIfExists);
362        }
363        log.info("ES Service ready");
364        indexInitDone = true;
365    }
366
367    @Override
368    public void dropAndInitIndex(String indexName) {
369        log.info("Drop and init index: " + indexName);
370        indexInitDone = false;
371        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
372            if (conf.getName().equals(indexName)) {
373                initIndex(conf, true);
374            }
375        }
376        indexInitDone = true;
377    }
378
379    @Override
380    public void dropAndInitRepositoryIndex(String repositoryName) {
381        log.info("Drop and init index of repository: " + repositoryName);
382        indexInitDone = false;
383        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
384            if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) {
385                initIndex(conf, true);
386            }
387        }
388        indexInitDone = true;
389    }
390
391    @Override
392    public List<String> getRepositoryNames() {
393        return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet()));
394    }
395
396    void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) {
397        if (!conf.mustCreate()) {
398            return;
399        }
400        log.info(String.format("Initialize index: %s, type: %s", conf.getName(), conf.getType()));
401        boolean mappingExists = false;
402        boolean indexExists = getClient().admin()
403                                         .indices()
404                                         .prepareExists(conf.getName())
405                                         .execute()
406                                         .actionGet()
407                                         .isExists();
408        if (indexExists) {
409            if (!dropIfExists) {
410                log.debug("Index " + conf.getName() + " already exists");
411                mappingExists = getClient().admin()
412                                           .indices()
413                                           .prepareGetMappings(conf.getName())
414                                           .execute()
415                                           .actionGet()
416                                           .getMappings()
417                                           .get(conf.getName())
418                                           .containsKey(conf.getType());
419            } else {
420                if (!Framework.isTestModeSet()) {
421                    log.warn(String.format(
422                            "Initializing index: %s, type: %s with " + "dropIfExists flag, deleting an existing index",
423                            conf.getName(), conf.getType()));
424                }
425                getClient().admin().indices().delete(new DeleteIndexRequest(conf.getName())).actionGet();
426                indexExists = false;
427            }
428        }
429        if (!indexExists) {
430            log.info(String.format("Creating index: %s", conf.getName()));
431            if (log.isDebugEnabled()) {
432                log.debug("Using settings: " + conf.getSettings());
433            }
434            getClient().admin()
435                       .indices()
436                       .prepareCreate(conf.getName())
437                       .setSettings(conf.getSettings())
438                       .execute()
439                       .actionGet();
440        }
441        if (!mappingExists) {
442            log.info(String.format("Creating mapping type: %s on index: %s", conf.getType(), conf.getName()));
443            if (log.isDebugEnabled()) {
444                log.debug("Using mapping: " + conf.getMapping());
445            }
446            getClient().admin()
447                       .indices()
448                       .preparePutMapping(conf.getName())
449                       .setType(conf.getType())
450                       .setSource(conf.getMapping())
451                       .execute()
452                       .actionGet();
453            if (!dropIfExists && conf.getRepositoryName() != null) {
454                repositoryInitialized.add(conf.getRepositoryName());
455            }
456        }
457        // make sure the index is ready before returning
458        checkClusterHealth(conf.getName());
459    }
460
461    @Override
462    public long getPendingWorkerCount() {
463        // impl of scheduling is left to the ESService
464        throw new UnsupportedOperationException("Not implemented");
465    }
466
467    @Override
468    public long getRunningWorkerCount() {
469        // impl of scheduling is left to the ESService
470        throw new UnsupportedOperationException("Not implemented");
471    }
472
473    @Override
474    public int getTotalCommandProcessed() {
475        return totalCommandProcessed.get();
476    }
477
478    @Override
479    public boolean isEmbedded() {
480        return embedded;
481    }
482
483    @Override
484    public boolean useExternalVersion() {
485        if (isEmbedded()) {
486            return localConfig.useExternalVersion();
487        }
488        return remoteConfig.useExternalVersion();
489    }
490
491    @Override
492    public boolean isIndexingInProgress() {
493        // impl of scheduling is left to the ESService
494        throw new UnsupportedOperationException("Not implemented");
495    }
496
497    @Override
498    public ListenableFuture<Boolean> prepareWaitForIndexing() {
499        throw new UnsupportedOperationException("Not implemented");
500    }
501
502    /**
503     * Get the elastic search indexes for searches
504     */
505    String[] getSearchIndexes(List<String> searchRepositories) {
506        if (searchRepositories.isEmpty()) {
507            Collection<String> values = indexNames.values();
508            return values.toArray(new String[values.size()]);
509        }
510        String[] ret = new String[searchRepositories.size()];
511        int i = 0;
512        for (String repo : searchRepositories) {
513            ret[i++] = getIndexNameForRepository(repo);
514        }
515        return ret;
516    }
517
518    public boolean isReady() {
519        return indexInitDone;
520    }
521
522    String[] getIncludeSourceFields() {
523        return includeSourceFields;
524    }
525
526    String[] getExcludeSourceFields() {
527        return excludeSourceFields;
528    }
529
530    Map<String, String> getRepositoryMap() {
531        return repoNames;
532    }
533
534    /**
535     * Get the list of repository names that have their index created.
536     */
537    public List<String> getInitializedRepositories() {
538        return repositoryInitialized;
539    }
540}