001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Tiry
016 *     bdelbosc
017 */
018
019package org.nuxeo.elasticsearch.core;
020
021import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS;
022
023import java.net.InetAddress;
024import java.net.UnknownHostException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.LinkedHashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.NoSuchElementException;
034import java.util.Set;
035import java.util.concurrent.atomic.AtomicInteger;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
040import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
041import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
042import org.elasticsearch.client.Client;
043import org.elasticsearch.client.transport.NoNodeAvailableException;
044import org.elasticsearch.client.transport.TransportClient;
045import org.elasticsearch.common.settings.ImmutableSettings;
046import org.elasticsearch.common.settings.ImmutableSettings.Builder;
047import org.elasticsearch.common.settings.Settings;
048import org.elasticsearch.common.transport.InetSocketTransportAddress;
049import org.elasticsearch.node.Node;
050import org.elasticsearch.node.NodeBuilder;
051import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
052import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
053import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
054import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
055import org.nuxeo.runtime.api.Framework;
056
057import com.google.common.util.concurrent.ListenableFuture;
058
059/**
060 * @since 6.0
061 */
062public class ElasticSearchAdminImpl implements ElasticSearchAdmin {
063    private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class);
064
065    private static final String TIMEOUT_WAIT_FOR_CLUSTER = "30s";
066
067    final AtomicInteger totalCommandProcessed = new AtomicInteger(0);
068
069    private final Map<String, String> indexNames = new HashMap<>();
070
071    private final Map<String, String> repoNames = new HashMap<>();
072
073    private final Map<String, ElasticSearchIndexConfig> indexConfig;
074
075    private Node localNode;
076
077    private Client client;
078
079    private boolean indexInitDone = false;
080
081    private final ElasticSearchLocalConfig localConfig;
082
083    private final ElasticSearchRemoteConfig remoteConfig;
084
085    private String[] includeSourceFields;
086
087    private String[] excludeSourceFields;
088
089    private boolean embedded = true;
090
091    private List<String> repositoryInitialized = new ArrayList<>();
092
093    /**
094     * Init the admin service, remote configuration if not null will take precedence over local embedded configuration.
095     */
096    public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig,
097            Map<String, ElasticSearchIndexConfig> indexConfig) {
098        this.remoteConfig = remoteConfig;
099        this.localConfig = localConfig;
100        this.indexConfig = indexConfig;
101        connect();
102        initializeIndexes();
103    }
104
105    private void connect() {
106        if (client != null) {
107            return;
108        }
109        if (remoteConfig != null) {
110            client = connectToRemote(remoteConfig);
111            embedded = false;
112        } else {
113            localNode = createEmbeddedNode(localConfig);
114            client = connectToEmbedded();
115            embedded = true;
116        }
117        checkClusterHealth();
118        log.info("ES Connected");
119    }
120
121    public void disconnect() {
122        if (client != null) {
123            client.close();
124            client = null;
125            indexInitDone = false;
126            log.info("ES Disconnected");
127        }
128        if (localNode != null) {
129            localNode.close();
130            localNode = null;
131            log.info("ES embedded Node Stopped");
132        }
133    }
134
135    private Node createEmbeddedNode(ElasticSearchLocalConfig conf) {
136        log.info("ES embedded Node Initializing (local in JVM)");
137        if (conf == null) {
138            throw new IllegalStateException("No embedded configuration defined");
139        }
140        if (!Framework.isTestModeSet()) {
141            log.warn("Elasticsearch embedded configuration is ONLY for testing"
142                    + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production.");
143        }
144        Builder sBuilder = ImmutableSettings.settingsBuilder();
145        sBuilder.put("http.enabled", conf.httpEnabled()).put("network.host", conf.getNetworkHost()).put("path.data",
146                conf.getDataPath()).put("index.number_of_shards", 1).put("index.number_of_replicas", 0).put(
147                "cluster.name", conf.getClusterName()).put("node.name", conf.getNodeName()).put(
148                "http.netty.worker_count", 4).put("http.cors.enabled", true).put(
149                "cluster.routing.allocation.disk.threshold_enabled", false);
150        if (conf.getIndexStorageType() != null) {
151            sBuilder.put("index.store.type", conf.getIndexStorageType());
152            if (conf.getIndexStorageType().equals("memory")) {
153                sBuilder.put("gateway.type", "none");
154            }
155        }
156        Settings settings = sBuilder.build();
157        log.debug("Using settings: " + settings.toDelimitedString(','));
158        Node ret = NodeBuilder.nodeBuilder().local(true).settings(settings).node();
159        assert ret != null : "Can not create an embedded ES Node";
160        return ret;
161    }
162
163    private Client connectToEmbedded() {
164        log.info("Connecting to embedded ES");
165        Client ret = localNode.start().client();
166        assert ret != null : "Can not connect to embedded ES Node";
167        return ret;
168    }
169
170    private Client connectToRemote(ElasticSearchRemoteConfig config) {
171        log.info("Connecting to remote ES cluster: " + config);
172        Builder builder = ImmutableSettings.settingsBuilder().put("cluster.name", config.getClusterName()).put(
173                "client.transport.nodes_sampler_interval", config.getSamplerInterval()).put(
174                "client.transport.ping_timeout", config.getPingTimeout()).put("client.transport.ignore_cluster_name",
175                config.isIgnoreClusterName()).put("client.transport.sniff", config.isClusterSniff());
176        Settings settings = builder.build();
177        if (log.isDebugEnabled()) {
178            log.debug("Using settings: " + settings.toDelimitedString(','));
179        }
180        TransportClient ret = new TransportClient(settings);
181        String[] addresses = config.getAddresses();
182        if (addresses == null) {
183            log.error("You need to provide an addressList to join a cluster");
184        } else {
185            for (String item : config.getAddresses()) {
186                String[] address = item.split(":");
187                log.debug("Add transport address: " + item);
188                try {
189                    InetAddress inet = InetAddress.getByName(address[0]);
190                    ret.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1])));
191                } catch (UnknownHostException e) {
192                    log.error("Unable to resolve host " + address[0], e);
193                }
194            }
195        }
196        assert ret != null : "Unable to create a remote client";
197        return ret;
198    }
199
200    private void checkClusterHealth(String... indexNames) {
201        if (client == null) {
202            throw new IllegalStateException("No es client available");
203        }
204        String errorMessage = null;
205        try {
206            log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames));
207            ClusterHealthResponse ret = client.admin().cluster().prepareHealth(indexNames).setTimeout(
208                    TIMEOUT_WAIT_FOR_CLUSTER).setWaitForYellowStatus().get();
209            if (ret.isTimedOut()) {
210                errorMessage = "ES Cluster health status not Yellow after " + TIMEOUT_WAIT_FOR_CLUSTER + " give up: "
211                        + ret;
212            } else {
213                if ((indexNames.length > 0) && ret.getStatus() != ClusterHealthStatus.GREEN) {
214                    log.warn("Es Cluster ready but not GREEN: " + ret);
215                } else {
216                    log.info("ES Cluster ready: " + ret);
217                }
218            }
219        } catch (NoNodeAvailableException e) {
220            errorMessage = "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage();
221        }
222        if (errorMessage != null) {
223            log.error(errorMessage);
224            throw new RuntimeException(errorMessage);
225        }
226    }
227
228    private void initializeIndexes() {
229        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
230            if (conf.isDocumentIndex()) {
231                log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName());
232                indexNames.put(conf.getRepositoryName(), conf.getName());
233                repoNames.put(conf.getName(), conf.getRepositoryName());
234                Set<String> set = new LinkedHashSet<>();
235                if (includeSourceFields != null) {
236                    set.addAll(Arrays.asList(includeSourceFields));
237                }
238                set.addAll(Arrays.asList(conf.getIncludes()));
239                if (set.contains(ALL_FIELDS)) {
240                    set.clear();
241                    set.add(ALL_FIELDS);
242                }
243                includeSourceFields = set.toArray(new String[set.size()]);
244                set.clear();
245                if (excludeSourceFields != null) {
246                    set.addAll(Arrays.asList(excludeSourceFields));
247                }
248                set.addAll(Arrays.asList(conf.getExcludes()));
249                excludeSourceFields = set.toArray(new String[set.size()]);
250            }
251
252        }
253        initIndexes(false);
254    }
255
256    // Admin Impl =============================================================
257    @Override
258    public void refreshRepositoryIndex(String repositoryName) {
259        if (log.isDebugEnabled()) {
260            log.debug("Refreshing index associated with repo: " + repositoryName);
261        }
262        getClient().admin().indices().prepareRefresh(getIndexNameForRepository(repositoryName)).execute().actionGet();
263        if (log.isDebugEnabled()) {
264            log.debug("Refreshing index done");
265        }
266    }
267
268    @Override
269    public String getIndexNameForRepository(String repositoryName) {
270        String ret = indexNames.get(repositoryName);
271        if (ret == null) {
272            throw new NoSuchElementException("No index defined for repository: " + repositoryName);
273        }
274        return ret;
275    }
276
277    @Override
278    public List<String> getIndexNamesForType(String type) {
279        List<String> indexNames = new ArrayList<>();
280        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
281            if (type.equals(conf.getType())) {
282                indexNames.add(conf.getName());
283            }
284        }
285        return indexNames;
286    }
287
288    @Override
289    public String getIndexNameForType(String type) {
290        List<String> indexNames = getIndexNamesForType(type);
291        if (indexNames.isEmpty()) {
292            throw new NoSuchElementException("No index defined for type: " + type);
293        }
294        return indexNames.get(0);
295    }
296
297    @Override
298    public void flushRepositoryIndex(String repositoryName) {
299        log.warn("Flushing index associated with repo: " + repositoryName);
300        getClient().admin().indices().prepareFlush(getIndexNameForRepository(repositoryName)).execute().actionGet();
301        log.info("Flushing index done");
302    }
303
304    @Override
305    public void refresh() {
306        for (String repositoryName : indexNames.keySet()) {
307            refreshRepositoryIndex(repositoryName);
308        }
309    }
310
311    @Override
312    public void flush() {
313        for (String repositoryName : indexNames.keySet()) {
314            flushRepositoryIndex(repositoryName);
315        }
316    }
317
318    @Override
319    public void optimizeIndex(String indexName) {
320        log.warn("Optimizing index: " + indexName);
321        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
322            if (conf.getName().equals(indexName)) {
323                getClient().admin().indices().prepareOptimize(indexName).get();
324            }
325        }
326        log.info("Optimize done");
327    }
328
329    @Override
330    public void optimizeRepositoryIndex(String repositoryName) {
331        optimizeIndex(getIndexNameForRepository(repositoryName));
332    }
333
334    @Override
335    public void optimize() {
336        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
337            optimizeIndex(conf.getName());
338        }
339    }
340
341    @Override
342    public Client getClient() {
343        return client;
344    }
345
346    @Override
347    public void initIndexes(boolean dropIfExists) {
348        indexInitDone = false;
349        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
350            initIndex(conf, dropIfExists);
351        }
352        log.info("ES Service ready");
353        indexInitDone = true;
354    }
355
356    @Override
357    public void dropAndInitIndex(String indexName) {
358        log.info("Drop and init index: " + indexName);
359        indexInitDone = false;
360        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
361            if (conf.getName().equals(indexName)) {
362                initIndex(conf, true);
363            }
364        }
365        indexInitDone = true;
366    }
367
368    @Override
369    public void dropAndInitRepositoryIndex(String repositoryName) {
370        log.info("Drop and init index of repository: " + repositoryName);
371        indexInitDone = false;
372        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
373            if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) {
374                initIndex(conf, true);
375            }
376        }
377        indexInitDone = true;
378    }
379
380    @Override
381    public List<String> getRepositoryNames() {
382        return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet()));
383    }
384
385    void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) {
386        if (!conf.mustCreate()) {
387            return;
388        }
389        log.info(String.format("Initialize index: %s, type: %s", conf.getName(), conf.getType()));
390        boolean mappingExists = false;
391        boolean indexExists = getClient().admin().indices().prepareExists(conf.getName()).execute().actionGet().isExists();
392        if (indexExists) {
393            if (!dropIfExists) {
394                log.debug("Index " + conf.getName() + " already exists");
395                mappingExists = getClient().admin().indices().prepareGetMappings(conf.getName()).execute().actionGet().getMappings().get(
396                        conf.getName()).containsKey(conf.getType());
397            } else {
398                if (!Framework.isTestModeSet()) {
399                    log.warn(String.format("Initializing index: %s, type: %s with "
400                            + "dropIfExists flag, deleting an existing index", conf.getName(), conf.getType()));
401                }
402                getClient().admin().indices().delete(new DeleteIndexRequest(conf.getName())).actionGet();
403                indexExists = false;
404            }
405        }
406        if (!indexExists) {
407            log.info(String.format("Creating index: %s", conf.getName()));
408            if (log.isDebugEnabled()) {
409                log.debug("Using settings: " + conf.getSettings());
410            }
411            getClient().admin().indices().prepareCreate(conf.getName()).setSettings(conf.getSettings()).execute().actionGet();
412        }
413        if (!mappingExists) {
414            log.info(String.format("Creating mapping type: %s on index: %s", conf.getType(), conf.getName()));
415            if (log.isDebugEnabled()) {
416                log.debug("Using mapping: " + conf.getMapping());
417            }
418            getClient().admin().indices().preparePutMapping(conf.getName()).setType(conf.getType()).setSource(
419                    conf.getMapping()).execute().actionGet();
420            if (!dropIfExists && conf.getRepositoryName() != null) {
421                repositoryInitialized.add(conf.getRepositoryName());
422            }
423        }
424        // make sure the index is ready before returning
425        checkClusterHealth(conf.getName());
426    }
427
428    @Override
429    public int getPendingWorkerCount() {
430        // impl of scheduling is left to the ESService
431        throw new UnsupportedOperationException("Not implemented");
432    }
433
434    @Override
435    public int getRunningWorkerCount() {
436        // impl of scheduling is left to the ESService
437        throw new UnsupportedOperationException("Not implemented");
438    }
439
440    @Override
441    public int getTotalCommandProcessed() {
442        return totalCommandProcessed.get();
443    }
444
445    @Override
446    public boolean isEmbedded() {
447        return embedded;
448    }
449
450    @Override
451    public boolean isIndexingInProgress() {
452        // impl of scheduling is left to the ESService
453        throw new UnsupportedOperationException("Not implemented");
454    }
455
456    @Override
457    public ListenableFuture<Boolean> prepareWaitForIndexing() {
458        throw new UnsupportedOperationException("Not implemented");
459    }
460
461    /**
462     * Get the elastic search indexes for searches
463     */
464    String[] getSearchIndexes(List<String> searchRepositories) {
465        if (searchRepositories.isEmpty()) {
466            Collection<String> values = indexNames.values();
467            return values.toArray(new String[values.size()]);
468        }
469        String[] ret = new String[searchRepositories.size()];
470        int i = 0;
471        for (String repo : searchRepositories) {
472            ret[i++] = getIndexNameForRepository(repo);
473        }
474        return ret;
475    }
476
477    public boolean isReady() {
478        return indexInitDone;
479    }
480
481    String[] getIncludeSourceFields() {
482        return includeSourceFields;
483    }
484
485    String[] getExcludeSourceFields() {
486        return excludeSourceFields;
487    }
488
489    Map<String, String> getRepositoryMap() {
490        return repoNames;
491    }
492
493    /**
494     * Get the list of repository names that have their index created.
495     */
496    public List<String> getInitializedRepositories() {
497        return repositoryInitialized;
498    }
499}