001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo 018 */ 019package org.nuxeo.elasticsearch.web.admin; 020 021import static org.jboss.seam.ScopeType.EVENT; 022 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collections; 026import java.util.List; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 031import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; 032import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; 033import org.elasticsearch.action.count.CountResponse; 034import org.elasticsearch.index.query.QueryBuilders; 035import org.jboss.seam.annotations.In; 036import org.jboss.seam.annotations.Name; 037import org.jboss.seam.annotations.Scope; 038import org.nuxeo.ecm.core.api.CoreInstance; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.DocumentRef; 042import org.nuxeo.ecm.core.api.IdRef; 043import org.nuxeo.ecm.platform.query.api.PageProvider; 044import org.nuxeo.ecm.platform.query.api.PageProviderDefinition; 045import org.nuxeo.ecm.platform.query.api.PageProviderService; 046import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 047import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 048import org.nuxeo.elasticsearch.commands.IndexingCommand; 049import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 050import org.nuxeo.runtime.api.Framework; 051import org.nuxeo.runtime.metrics.MetricsService; 052 053import com.codahale.metrics.MetricRegistry; 054import com.codahale.metrics.SharedMetricRegistries; 055import com.codahale.metrics.Timer; 056 057/** 058 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a> 059 */ 060@Name("esAdmin") 061@Scope(EVENT) 062public class ElasticSearchManager { 063 064 private static final Log log = LogFactory.getLog(ElasticSearchManager.class); 065 066 private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document"; 067 068 private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}"; 069 070 private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo"; 071 072 @In(create = true) 073 protected ElasticSearchAdmin esa; 074 075 @In(create = true) 076 protected ElasticSearchIndexing esi; 077 078 @In(create = true, required = false) 079 protected transient CoreSession documentManager; 080 081 protected List<PageProviderStatus> ppStatuses = null; 082 083 protected Timer indexTimer; 084 085 protected Timer bulkIndexTimer; 086 087 private String rootId; 088 089 private String nxql = DEFAULT_NXQL_QUERY; 090 091 private String repositoryName; 092 093 private Boolean dropIndex = false; 094 095 public String getNodesInfo() { 096 NodesInfoResponse nodesInfo = esa.getClient().admin().cluster().prepareNodesInfo().execute().actionGet(); 097 return nodesInfo.toString(); 098 } 099 100 public String getNodesStats() { 101 NodesStatsResponse stats = esa.getClient().admin().cluster().prepareNodesStats().execute().actionGet(); 102 return stats.toString(); 103 } 104 105 public String getNodesHealth() { 106 String[] indices = getIndexNames(); 107 ClusterHealthResponse health = esa.getClient().admin().cluster().prepareHealth(indices).get(); 108 return health.toString(); 109 } 110 111 public void startReindexAll() { 112 log.warn("Re-indexing the entire repository: " + repositoryName); 113 esa.dropAndInitRepositoryIndex(repositoryName); 114 esi.runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 115 } 116 117 public void startReindexNxql() { 118 log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName)); 119 esi.runReindexingWorker(repositoryName, getNxql()); 120 } 121 122 public void startReindexFrom() { 123 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 124 log.warn(String.format("Try to remove %s and its children from %s repository index", rootId, repositoryName)); 125 String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName); 126 IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd); 127 esi.indexNonRecursive(rmCmd); 128 129 DocumentRef ref = new IdRef(rootId); 130 if (session.exists(ref)) { 131 DocumentModel doc = session.getDocument(ref); 132 log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc, 133 repositoryName)); 134 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true); 135 esi.runIndexingWorker(Arrays.asList(cmd)); 136 } 137 } 138 } 139 140 public void flush() { 141 esa.flush(); 142 } 143 144 public void optimize() { 145 esa.optimize(); 146 } 147 148 protected void introspectPageProviders() { 149 150 ppStatuses = new ArrayList<>(); 151 152 PageProviderService pps = Framework.getLocalService(PageProviderService.class); 153 for (String ppName : pps.getPageProviderDefinitionNames()) { 154 PageProviderDefinition def = pps.getPageProviderDefinition(ppName); 155 // Create an instance so class replacer is taken in account 156 PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null); 157 String klass = pp.getClass().getCanonicalName(); 158 ppStatuses.add(new PageProviderStatus(ppName, klass)); 159 } 160 Collections.sort(ppStatuses); 161 } 162 163 public List<PageProviderStatus> getContentViewStatus() { 164 if (ppStatuses == null) { 165 introspectPageProviders(); 166 } 167 return ppStatuses; 168 } 169 170 public Boolean isIndexingInProgress() { 171 return esa.isIndexingInProgress(); 172 } 173 174 public Boolean displayClusterInfo() { 175 if (esa.isEmbedded()) { 176 return true; 177 } 178 return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false")); 179 } 180 181 public String getPendingWorkerCount() { 182 return Long.valueOf(esa.getPendingWorkerCount()).toString(); 183 } 184 185 public String getRunningWorkerCount() { 186 return Long.valueOf(esa.getRunningWorkerCount()).toString(); 187 } 188 189 public String getTotalCommandProcessed() { 190 return Integer.valueOf(esa.getTotalCommandProcessed()).toString(); 191 } 192 193 public String getNumberOfDocuments() { 194 String[] indices = getIndexNames(); 195 CountResponse ret = esa.getClient().prepareCount(indices).setQuery(QueryBuilders.matchAllQuery()).get(); 196 return Long.valueOf(ret.getCount()).toString(); 197 } 198 199 private String[] getIndexNames() { 200 String indices[] = new String[esa.getRepositoryNames().size()]; 201 int i=0; 202 for (String repo : esa.getRepositoryNames()) { 203 indices[i++] = esa.getIndexNameForRepository(repo); 204 } 205 return indices; 206 } 207 208 public String getIndexingRates() { 209 if (indexTimer == null) { 210 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 211 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 212 213 } 214 return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(), 215 indexTimer.getFifteenMinuteRate()); 216 } 217 218 public String getBulkIndexingRates() { 219 if (bulkIndexTimer == null) { 220 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 221 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 222 223 } 224 return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(), 225 bulkIndexTimer.getFifteenMinuteRate()); 226 } 227 228 public String getRootId() { 229 return rootId; 230 } 231 232 public List<String> getRepositoryNames() { 233 return esa.getRepositoryNames(); 234 } 235 236 public void setRootId(String rootId) { 237 this.rootId = rootId; 238 } 239 240 public String getNxql() { 241 return nxql; 242 } 243 244 public void setNxql(String nxql) { 245 this.nxql = nxql; 246 } 247 248 public String getRepositoryName() { 249 return repositoryName; 250 } 251 252 public void setRepositoryName(String repositoryName) { 253 this.repositoryName = repositoryName; 254 } 255 256 public Boolean getDropIndex() { 257 return dropIndex; 258 } 259 260 public void setDropIndex(Boolean dropIndex) { 261 this.dropIndex = dropIndex; 262 } 263}