001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.core; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.net.BindException; 024import java.nio.file.Files; 025import java.nio.file.Path; 026import java.nio.file.Paths; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.stream.Collectors; 031import java.util.stream.Stream; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.elasticsearch.common.settings.Settings; 036import org.elasticsearch.node.Node; 037import org.elasticsearch.node.NodeValidationException; 038import org.elasticsearch.plugins.Plugin; 039import org.elasticsearch.transport.Netty4Plugin; 040import org.nuxeo.common.utils.ExceptionUtils; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.elasticsearch.config.ElasticSearchEmbeddedServerConfig; 043import org.nuxeo.runtime.api.Framework; 044 045/** 046 * @since 9.3 047 */ 048public class ElasticSearchEmbeddedNode implements Closeable { 049 private static final Log log = LogFactory.getLog(ElasticSearchEmbeddedNode.class); 050 051 private static final int DEFAULT_RETRY = 3; 052 053 protected final ElasticSearchEmbeddedServerConfig config; 054 055 protected Node node; 056 057 protected int retry = DEFAULT_RETRY; 058 059 public ElasticSearchEmbeddedNode(ElasticSearchEmbeddedServerConfig config) { 060 this.config = config; 061 } 062 063 public void start() { 064 log.info("Starting embedded (in JVM) Elasticsearch"); 065 if (!Framework.isTestModeSet()) { 066 log.warn("Elasticsearch embedded configuration is ONLY for testing" 067 + " purpose. You need to create a dedicated Elasticsearch" + " cluster for production."); 068 } 069 Settings.Builder buidler = Settings.builder(); 070 buidler.put("http.enabled", config.httpEnabled()) 071 .put("network.host", config.getNetworkHost()) 072 .put("path.home", config.getHomePath()) 073 .put("path.data", config.getDataPath()) 074 .put("cluster.name", config.getClusterName()) 075 .put("node.name", config.getNodeName()) 076 .put("http.netty.worker_count", 4) 077 .put("http.cors.enabled", true) 078 .put("http.cors.allow-origin", "*") 079 .put("http.cors.allow-credentials", true) 080 .put("http.cors.allow-headers", "Authorization, X-Requested-With, Content-Type, Content-Length") 081 .put("cluster.routing.allocation.disk.threshold_enabled", false) 082 .put("http.port", config.getHttpPort()); 083 if (config.getIndexStorageType() != null) { 084 buidler.put("index.store.type", config.getIndexStorageType()); 085 } 086 Settings settings = buidler.build(); 087 log.debug("Using settings: " + settings.toDelimitedString(',')); 088 089 Collection<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class); 090 try { 091 node = new PluginConfigurableNode(settings, plugins); 092 node.start(); 093 // try with another home path 094 config.setHomePath(null); 095 } catch (NodeValidationException e) { 096 throw new NuxeoException("Cannot start embedded Elasticsearch: " + e.getMessage(), e); 097 } catch (Exception e) { 098 Throwable cause = ExceptionUtils.getRootCause(e); 099 if (cause != null && cause instanceof BindException) { 100 retry--; 101 log.error(String.format("Cannot bind local Elasticsearch on port %s, from %s, retry countdown: %d", 102 config.getHttpPort(), config.getDataPath(), retry)); 103 try { 104 node.close(); 105 Thread.sleep(5000); 106 } catch (InterruptedException e1) { 107 Thread.currentThread().interrupt(); 108 throw new NuxeoException(e1); 109 } catch (IOException e1) { 110 throw new NuxeoException(e1); 111 } 112 if (retry <= 0) { 113 String msg = "Not able to bind to local Elasticsearch after multiple attempts, give up"; 114 log.error(msg); 115 throw new IllegalStateException(msg); 116 } 117 start(); 118 } else { 119 throw e; 120 } 121 } 122 retry = DEFAULT_RETRY; 123 log.debug("Elasticsearch node started."); 124 } 125 126 @Override 127 public void close() throws IOException { 128 log.info("Closing embedded (in JVM) Elasticsearch"); 129 node.close(); 130 // TODO: should we delete the lock ? 131 // deleteLuceneFileLock(config.getDataPath()); 132 log.info("Node closed: " + node.isClosed()); 133 node = null; 134 } 135 136 protected void deleteLuceneFileLock(String root) throws IOException { 137 try (Stream<Path> files = Files.walk(Paths.get(root))) { 138 List<Path> locks = files.filter(f -> f.getFileName().toString().equals("node.lock")) 139 .collect(Collectors.toList()); 140 if (!locks.isEmpty()) { 141 locks.forEach(f -> log.warn("Found lock on close, deleting: " + f)); 142 locks.forEach(f -> f.toFile().delete()); 143 } 144 } 145 } 146 147 public ElasticSearchEmbeddedServerConfig getConfig() { 148 return config; 149 } 150 151 public Node getNode() { 152 return node; 153 } 154}