001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Nuxeo 016 */ 017package org.nuxeo.elasticsearch.web.admin; 018 019import static org.jboss.seam.ScopeType.EVENT; 020 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.List; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 029import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; 030import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; 031import org.elasticsearch.action.count.CountResponse; 032import org.elasticsearch.index.query.QueryBuilders; 033import org.jboss.seam.annotations.In; 034import org.jboss.seam.annotations.Name; 035import org.jboss.seam.annotations.Scope; 036import org.nuxeo.ecm.core.api.CoreInstance; 037import org.nuxeo.ecm.core.api.CoreSession; 038import org.nuxeo.ecm.core.api.DocumentModel; 039import org.nuxeo.ecm.core.api.DocumentRef; 040import org.nuxeo.ecm.core.api.IdRef; 041import org.nuxeo.ecm.platform.query.api.PageProvider; 042import org.nuxeo.ecm.platform.query.api.PageProviderDefinition; 043import org.nuxeo.ecm.platform.query.api.PageProviderService; 044import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 045import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 046import org.nuxeo.elasticsearch.commands.IndexingCommand; 047import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 048import org.nuxeo.runtime.api.Framework; 049import org.nuxeo.runtime.metrics.MetricsService; 050 051import com.codahale.metrics.MetricRegistry; 052import com.codahale.metrics.SharedMetricRegistries; 053import com.codahale.metrics.Timer; 054 055/** 056 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a> 057 */ 058@Name("esAdmin") 059@Scope(EVENT) 060public class ElasticSearchManager { 061 062 private static final Log log = LogFactory.getLog(ElasticSearchManager.class); 063 064 private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document"; 065 066 private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}"; 067 068 private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo"; 069 070 @In(create = true) 071 protected ElasticSearchAdmin esa; 072 073 @In(create = true) 074 protected ElasticSearchIndexing esi; 075 076 @In(create = true, required = false) 077 protected transient CoreSession documentManager; 078 079 protected List<PageProviderStatus> ppStatuses = null; 080 081 protected Timer indexTimer; 082 083 protected Timer bulkIndexTimer; 084 085 private String rootId; 086 087 private String nxql = DEFAULT_NXQL_QUERY; 088 089 private String repositoryName; 090 091 private Boolean dropIndex = false; 092 093 public String getNodesInfo() { 094 NodesInfoResponse nodesInfo = esa.getClient().admin().cluster().prepareNodesInfo().execute().actionGet(); 095 return nodesInfo.toString(); 096 } 097 098 public String getNodesStats() { 099 NodesStatsResponse stats = esa.getClient().admin().cluster().prepareNodesStats().execute().actionGet(); 100 return stats.toString(); 101 } 102 103 public String getNodesHealth() { 104 String[] indices = getIndexNames(); 105 ClusterHealthResponse health = esa.getClient().admin().cluster().prepareHealth(indices).get(); 106 return health.toString(); 107 } 108 109 public void startReindexAll() { 110 log.warn("Re-indexing the entire repository: " + repositoryName); 111 esa.dropAndInitRepositoryIndex(repositoryName); 112 esi.runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 113 } 114 115 public void startReindexNxql() { 116 log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName)); 117 esi.runReindexingWorker(repositoryName, getNxql()); 118 } 119 120 public void startReindexFrom() { 121 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 122 log.warn(String.format("Try to remove %s and its children from %s repository index", rootId, repositoryName)); 123 String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName); 124 IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd); 125 esi.indexNonRecursive(rmCmd); 126 127 DocumentRef ref = new IdRef(rootId); 128 if (session.exists(ref)) { 129 DocumentModel doc = session.getDocument(ref); 130 log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc, 131 repositoryName)); 132 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true); 133 esi.runIndexingWorker(Arrays.asList(cmd)); 134 } 135 } 136 } 137 138 public void flush() { 139 esa.flush(); 140 } 141 142 public void optimize() { 143 esa.optimize(); 144 } 145 146 protected void introspectPageProviders() { 147 148 ppStatuses = new ArrayList<>(); 149 150 PageProviderService pps = Framework.getLocalService(PageProviderService.class); 151 for (String ppName : pps.getPageProviderDefinitionNames()) { 152 PageProviderDefinition def = pps.getPageProviderDefinition(ppName); 153 // Create an instance so class replacer is taken in account 154 PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null); 155 String klass = pp.getClass().getCanonicalName(); 156 ppStatuses.add(new PageProviderStatus(ppName, klass)); 157 } 158 Collections.sort(ppStatuses); 159 } 160 161 public List<PageProviderStatus> getContentViewStatus() { 162 if (ppStatuses == null) { 163 introspectPageProviders(); 164 } 165 return ppStatuses; 166 } 167 168 public Boolean isIndexingInProgress() { 169 return esa.isIndexingInProgress(); 170 } 171 172 public Boolean displayClusterInfo() { 173 if (esa.isEmbedded()) { 174 return true; 175 } 176 return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false")); 177 } 178 179 public String getPendingWorkerCount() { 180 return Integer.valueOf(esa.getPendingWorkerCount()).toString(); 181 } 182 183 public String getRunningWorkerCount() { 184 return Integer.valueOf(esa.getRunningWorkerCount()).toString(); 185 } 186 187 public String getTotalCommandProcessed() { 188 return Integer.valueOf(esa.getTotalCommandProcessed()).toString(); 189 } 190 191 public String getNumberOfDocuments() { 192 String[] indices = getIndexNames(); 193 CountResponse ret = esa.getClient().prepareCount(indices).setQuery(QueryBuilders.matchAllQuery()).get(); 194 return Long.valueOf(ret.getCount()).toString(); 195 } 196 197 private String[] getIndexNames() { 198 String indices[] = new String[esa.getRepositoryNames().size()]; 199 int i=0; 200 for (String repo : esa.getRepositoryNames()) { 201 indices[i++] = esa.getIndexNameForRepository(repo); 202 } 203 return indices; 204 } 205 206 public String getIndexingRates() { 207 if (indexTimer == null) { 208 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 209 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 210 211 } 212 return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(), 213 indexTimer.getFifteenMinuteRate()); 214 } 215 216 public String getBulkIndexingRates() { 217 if (bulkIndexTimer == null) { 218 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 219 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 220 221 } 222 return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(), 223 bulkIndexTimer.getFifteenMinuteRate()); 224 } 225 226 public String getRootId() { 227 return rootId; 228 } 229 230 public List<String> getRepositoryNames() { 231 return esa.getRepositoryNames(); 232 } 233 234 public void setRootId(String rootId) { 235 this.rootId = rootId; 236 } 237 238 public String getNxql() { 239 return nxql; 240 } 241 242 public void setNxql(String nxql) { 243 this.nxql = nxql; 244 } 245 246 public String getRepositoryName() { 247 return repositoryName; 248 } 249 250 public void setRepositoryName(String repositoryName) { 251 this.repositoryName = repositoryName; 252 } 253 254 public Boolean getDropIndex() { 255 return dropIndex; 256 } 257 258 public void setDropIndex(Boolean dropIndex) { 259 this.dropIndex = dropIndex; 260 } 261}