001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Tiry
016 *     bdelbosc
017 */
018
019package org.nuxeo.elasticsearch.core;
020
021import static org.nuxeo.elasticsearch.ElasticSearchConstants.ALL_FIELDS;
022
023import java.net.InetAddress;
024import java.net.UnknownHostException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.LinkedHashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.NoSuchElementException;
034import java.util.Set;
035import java.util.concurrent.atomic.AtomicInteger;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
040import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
041import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
042import org.elasticsearch.client.Client;
043import org.elasticsearch.client.transport.NoNodeAvailableException;
044import org.elasticsearch.client.transport.TransportClient;
045import org.elasticsearch.common.settings.ImmutableSettings;
046import org.elasticsearch.common.settings.ImmutableSettings.Builder;
047import org.elasticsearch.common.settings.Settings;
048import org.elasticsearch.common.transport.InetSocketTransportAddress;
049import org.elasticsearch.node.Node;
050import org.elasticsearch.node.NodeBuilder;
051import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
052import org.nuxeo.elasticsearch.config.ElasticSearchIndexConfig;
053import org.nuxeo.elasticsearch.config.ElasticSearchLocalConfig;
054import org.nuxeo.elasticsearch.config.ElasticSearchRemoteConfig;
055import org.nuxeo.runtime.api.Framework;
056
057import com.google.common.util.concurrent.ListenableFuture;
058
059/**
060 * @since 6.0
061 */
062public class ElasticSearchAdminImpl implements ElasticSearchAdmin {
063    private static final Log log = LogFactory.getLog(ElasticSearchAdminImpl.class);
064
065    private static final String TIMEOUT_WAIT_FOR_CLUSTER = "30s";
066
067    final AtomicInteger totalCommandProcessed = new AtomicInteger(0);
068
069    private final Map<String, String> indexNames = new HashMap<>();
070
071    private final Map<String, String> repoNames = new HashMap<>();
072
073    private final Map<String, ElasticSearchIndexConfig> indexConfig;
074
075    private Node localNode;
076
077    private Client client;
078
079    private boolean indexInitDone = false;
080
081    private final ElasticSearchLocalConfig localConfig;
082
083    private final ElasticSearchRemoteConfig remoteConfig;
084
085    private String[] includeSourceFields;
086
087    private String[] excludeSourceFields;
088
089    private boolean embedded = true;
090
091    private List<String> repositoryInitialized = new ArrayList<>();
092
093    /**
094     * Init the admin service, remote configuration if not null will take precedence over local embedded configuration.
095     */
096    public ElasticSearchAdminImpl(ElasticSearchLocalConfig localConfig, ElasticSearchRemoteConfig remoteConfig,
097            Map<String, ElasticSearchIndexConfig> indexConfig) {
098        this.remoteConfig = remoteConfig;
099        this.localConfig = localConfig;
100        this.indexConfig = indexConfig;
101        connect();
102        initializeIndexes();
103    }
104
105    private void connect() {
106        if (client != null) {
107            return;
108        }
109        if (remoteConfig != null) {
110            client = connectToRemote(remoteConfig);
111            embedded = false;
112        } else {
113            localNode = createEmbeddedNode(localConfig);
114            client = connectToEmbedded();
115            embedded = true;
116        }
117        checkClusterHealth();
118        log.info("ES Connected");
119    }
120
121    public void disconnect() {
122        if (client != null) {
123            client.close();
124            client = null;
125            indexInitDone = false;
126            log.info("ES Disconnected");
127        }
128        if (localNode != null) {
129            localNode.close();
130            localNode = null;
131            log.info("ES embedded Node Stopped");
132        }
133    }
134
135    private Node createEmbeddedNode(ElasticSearchLocalConfig conf) {
136        log.info("ES embedded Node Initializing (local in JVM)");
137        if (conf == null) {
138            throw new IllegalStateException("No embedded configuration defined");
139        }
140        if (!Framework.isTestModeSet()) {
141            log.warn("Elasticsearch embedded configuration is ONLY for testing"
142                    + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production.");
143        }
144        Builder sBuilder = ImmutableSettings.settingsBuilder();
145        sBuilder.put("http.enabled", conf.httpEnabled()).put("network.host", conf.getNetworkHost()).put("path.data",
146                conf.getDataPath()).put("index.number_of_shards", 1).put("index.number_of_replicas", 0).put(
147                        "cluster.name", conf.getClusterName()).put("node.name", conf.getNodeName()).put(
148                                "http.netty.worker_count", 4).put("http.cors.enabled", true).put(
149                                        "cluster.routing.allocation.disk.threshold_enabled", false);
150        if (conf.getIndexStorageType() != null) {
151            sBuilder.put("index.store.type", conf.getIndexStorageType());
152            if (conf.getIndexStorageType().equals("memory")) {
153                sBuilder.put("gateway.type", "none");
154            }
155        }
156        Settings settings = sBuilder.build();
157        log.debug("Using settings: " + settings.toDelimitedString(','));
158        Node ret = NodeBuilder.nodeBuilder().local(true).settings(settings).node();
159        assert ret != null : "Can not create an embedded ES Node";
160        return ret;
161    }
162
163    private Client connectToEmbedded() {
164        log.info("Connecting to embedded ES");
165        Client ret = localNode.start().client();
166        assert ret != null : "Can not connect to embedded ES Node";
167        return ret;
168    }
169
170    private Client connectToRemote(ElasticSearchRemoteConfig config) {
171        log.info("Connecting to remote ES cluster: " + config);
172        Builder builder = ImmutableSettings.settingsBuilder().put("cluster.name", config.getClusterName()).put(
173                "client.transport.nodes_sampler_interval", config.getSamplerInterval()).put(
174                "client.transport.ping_timeout", config.getPingTimeout()).put("client.transport.ignore_cluster_name",
175                config.isIgnoreClusterName()).put("client.transport.sniff", config.isClusterSniff());
176        Settings settings = builder.build();
177        if (log.isDebugEnabled()) {
178            log.debug("Using settings: " + settings.toDelimitedString(','));
179        }
180        TransportClient ret = new TransportClient(settings);
181        String[] addresses = config.getAddresses();
182        if (addresses == null) {
183            log.error("You need to provide an addressList to join a cluster");
184        } else {
185            for (String item : config.getAddresses()) {
186                String[] address = item.split(":");
187                log.debug("Add transport address: " + item);
188                try {
189                    InetAddress inet = InetAddress.getByName(address[0]);
190                    ret.addTransportAddress(new InetSocketTransportAddress(inet, Integer.parseInt(address[1])));
191                } catch (UnknownHostException e) {
192                    log.error("Unable to resolve host " + address[0], e);
193                }
194            }
195        }
196        assert ret != null : "Unable to create a remote client";
197        return ret;
198    }
199
200    private void checkClusterHealth(String... indexNames) {
201        if (client == null) {
202            throw new IllegalStateException("No es client available");
203        }
204        String errorMessage = null;
205        try {
206            log.debug("Waiting for cluster yellow health status, indexes: " + Arrays.toString(indexNames));
207            ClusterHealthResponse ret = client.admin().cluster().prepareHealth(indexNames).setTimeout(
208                    TIMEOUT_WAIT_FOR_CLUSTER).setWaitForYellowStatus().get();
209            if (ret.isTimedOut()) {
210                errorMessage = "ES Cluster health status not Yellow after " + TIMEOUT_WAIT_FOR_CLUSTER + " give up: "
211                        + ret;
212            } else {
213                if ((indexNames.length > 0) && ret.getStatus() != ClusterHealthStatus.GREEN) {
214                    log.warn("Es Cluster ready but not GREEN: " + ret);
215                } else {
216                    log.info("ES Cluster ready: " + ret);
217                }
218            }
219        } catch (NoNodeAvailableException e) {
220            errorMessage = "Failed to connect to elasticsearch, check addressList and clusterName: " + e.getMessage();
221        }
222        if (errorMessage != null) {
223            log.error(errorMessage);
224            throw new RuntimeException(errorMessage);
225        }
226    }
227
228    private void initializeIndexes() {
229        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
230            if (conf.isDocumentIndex()) {
231                log.info("Associate index " + conf.getName() + " with repository: " + conf.getRepositoryName());
232                indexNames.put(conf.getRepositoryName(), conf.getName());
233                repoNames.put(conf.getName(), conf.getRepositoryName());
234                Set<String> set = new LinkedHashSet<>();
235                if (includeSourceFields != null) {
236                    set.addAll(Arrays.asList(includeSourceFields));
237                }
238                set.addAll(Arrays.asList(conf.getIncludes()));
239                if (set.contains(ALL_FIELDS)) {
240                    set.clear();
241                    set.add(ALL_FIELDS);
242                }
243                includeSourceFields = set.toArray(new String[set.size()]);
244                set.clear();
245                if (excludeSourceFields != null) {
246                    set.addAll(Arrays.asList(excludeSourceFields));
247                }
248                set.addAll(Arrays.asList(conf.getExcludes()));
249                excludeSourceFields = set.toArray(new String[set.size()]);
250            }
251
252        }
253        initIndexes(false);
254    }
255
256    // Admin Impl =============================================================
257    @Override
258    public void refreshRepositoryIndex(String repositoryName) {
259        if (log.isDebugEnabled()) {
260            log.debug("Refreshing index associated with repo: " + repositoryName);
261        }
262        getClient().admin().indices().prepareRefresh(getIndexNameForRepository(repositoryName)).execute().actionGet();
263        if (log.isDebugEnabled()) {
264            log.debug("Refreshing index done");
265        }
266    }
267
268    @Override
269    public String getIndexNameForRepository(String repositoryName) {
270        String ret = indexNames.get(repositoryName);
271        if (ret == null) {
272            throw new NoSuchElementException("No index defined for repository: " + repositoryName);
273        }
274        return ret;
275    }
276
277    @Override
278    public void flushRepositoryIndex(String repositoryName) {
279        log.warn("Flushing index associated with repo: " + repositoryName);
280        getClient().admin().indices().prepareFlush(getIndexNameForRepository(repositoryName)).execute().actionGet();
281        log.info("Flushing index done");
282    }
283
284    @Override
285    public void refresh() {
286        for (String repositoryName : indexNames.keySet()) {
287            refreshRepositoryIndex(repositoryName);
288        }
289    }
290
291    @Override
292    public void flush() {
293        for (String repositoryName : indexNames.keySet()) {
294            flushRepositoryIndex(repositoryName);
295        }
296    }
297
298    @Override
299    public void optimizeIndex(String indexName) {
300        log.warn("Optimizing index: " + indexName);
301        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
302            if (conf.getName().equals(indexName)) {
303                getClient().admin().indices().prepareOptimize(indexName).get();
304            }
305        }
306        log.info("Optimize done");
307    }
308
309    @Override
310    public void optimizeRepositoryIndex(String repositoryName) {
311        optimizeIndex(getIndexNameForRepository(repositoryName));
312    }
313
314    @Override
315    public void optimize() {
316        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
317            optimizeIndex(conf.getName());
318        }
319    }
320
321    @Override
322    public Client getClient() {
323        return client;
324    }
325
326    @Override
327    public void initIndexes(boolean dropIfExists) {
328        indexInitDone = false;
329        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
330            initIndex(conf, dropIfExists);
331        }
332        log.info("ES Service ready");
333        indexInitDone = true;
334    }
335
336    @Override
337    public void dropAndInitIndex(String indexName) {
338        log.info("Drop and init index: " + indexName);
339        indexInitDone = false;
340        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
341            if (conf.getName().equals(indexName)) {
342                initIndex(conf, true);
343            }
344        }
345        indexInitDone = true;
346    }
347
348    @Override
349    public void dropAndInitRepositoryIndex(String repositoryName) {
350        log.info("Drop and init index of repository: " + repositoryName);
351        indexInitDone = false;
352        for (ElasticSearchIndexConfig conf : indexConfig.values()) {
353            if (conf.isDocumentIndex() && repositoryName.equals(conf.getRepositoryName())) {
354                initIndex(conf, true);
355            }
356        }
357        indexInitDone = true;
358    }
359
360    @Override
361    public List<String> getRepositoryNames() {
362        return Collections.unmodifiableList(new ArrayList<>(indexNames.keySet()));
363    }
364
365    void initIndex(ElasticSearchIndexConfig conf, boolean dropIfExists) {
366        if (!conf.mustCreate()) {
367            return;
368        }
369        log.info(String.format("Initialize index: %s, type: %s", conf.getName(), conf.getType()));
370        boolean mappingExists = false;
371        boolean indexExists = getClient().admin().indices().prepareExists(conf.getName()).execute().actionGet().isExists();
372        if (indexExists) {
373            if (!dropIfExists) {
374                log.debug("Index " + conf.getName() + " already exists");
375                mappingExists = getClient().admin().indices().prepareGetMappings(conf.getName()).execute().actionGet().getMappings().get(
376                        conf.getName()).containsKey(conf.getType());
377            } else {
378                if (!Framework.isTestModeSet()) {
379                    log.warn(String.format("Initializing index: %s, type: %s with "
380                            + "dropIfExists flag, deleting an existing index", conf.getName(), conf.getType()));
381                }
382                getClient().admin().indices().delete(new DeleteIndexRequest(conf.getName())).actionGet();
383                indexExists = false;
384            }
385        }
386        if (!indexExists) {
387            log.info(String.format("Creating index: %s", conf.getName()));
388            if (log.isDebugEnabled()) {
389                log.debug("Using settings: " + conf.getSettings());
390            }
391            getClient().admin().indices().prepareCreate(conf.getName()).setSettings(conf.getSettings()).execute().actionGet();
392        }
393        if (!mappingExists) {
394            log.info(String.format("Creating mapping type: %s on index: %s", conf.getType(), conf.getName()));
395            if (log.isDebugEnabled()) {
396                log.debug("Using mapping: " + conf.getMapping());
397            }
398            getClient().admin().indices().preparePutMapping(conf.getName()).setType(conf.getType()).setSource(
399                    conf.getMapping()).execute().actionGet();
400            if (!dropIfExists && conf.getRepositoryName() != null) {
401                repositoryInitialized.add(conf.getRepositoryName());
402            }
403        }
404        // make sure the index is ready before returning
405        checkClusterHealth(conf.getName());
406    }
407
408    @Override
409    public int getPendingWorkerCount() {
410        // impl of scheduling is left to the ESService
411        throw new UnsupportedOperationException("Not implemented");
412    }
413
414    @Override
415    public int getRunningWorkerCount() {
416        // impl of scheduling is left to the ESService
417        throw new UnsupportedOperationException("Not implemented");
418    }
419
420    @Override
421    public int getTotalCommandProcessed() {
422        return totalCommandProcessed.get();
423    }
424
425    @Override
426    public boolean isEmbedded() {
427        return embedded;
428    }
429
430    @Override
431    public boolean isIndexingInProgress() {
432        // impl of scheduling is left to the ESService
433        throw new UnsupportedOperationException("Not implemented");
434    }
435
436    @Override
437    public ListenableFuture<Boolean> prepareWaitForIndexing() {
438        throw new UnsupportedOperationException("Not implemented");
439    }
440
441    /**
442     * Get the elastic search indexes for searches
443     */
444    String[] getSearchIndexes(List<String> searchRepositories) {
445        if (searchRepositories.isEmpty()) {
446            Collection<String> values = indexNames.values();
447            return values.toArray(new String[values.size()]);
448        }
449        String[] ret = new String[searchRepositories.size()];
450        int i = 0;
451        for (String repo : searchRepositories) {
452            ret[i++] = getIndexNameForRepository(repo);
453        }
454        return ret;
455    }
456
457    public boolean isReady() {
458        return indexInitDone;
459    }
460
461    String[] getIncludeSourceFields() {
462        return includeSourceFields;
463    }
464
465    String[] getExcludeSourceFields() {
466        return excludeSourceFields;
467    }
468
469    Map<String, String> getRepositoryMap() {
470        return repoNames;
471    }
472
473    /**
474     * Get the list of repository names that have their index created.
475     */
476    public List<String> getInitializedRepositories() {
477        return repositoryInitialized;
478    }
479}