001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo 018 */ 019package org.nuxeo.elasticsearch.web.admin; 020 021import static org.jboss.seam.ScopeType.CONVERSATION; 022 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.List; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 032import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; 033import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; 034import org.elasticsearch.action.count.CountResponse; 035import org.elasticsearch.index.query.QueryBuilders; 036import org.jboss.seam.annotations.In; 037import org.jboss.seam.annotations.Name; 038import org.jboss.seam.annotations.Scope; 039import org.nuxeo.ecm.core.api.CoreInstance; 040import org.nuxeo.ecm.core.api.CoreSession; 041import org.nuxeo.ecm.core.api.DocumentModel; 042import org.nuxeo.ecm.core.api.DocumentRef; 043import org.nuxeo.ecm.core.api.IdRef; 044import org.nuxeo.ecm.platform.query.api.PageProvider; 045import org.nuxeo.ecm.platform.query.api.PageProviderDefinition; 046import org.nuxeo.ecm.platform.query.api.PageProviderService; 047import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 048import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 049import org.nuxeo.elasticsearch.commands.IndexingCommand; 050import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 051import org.nuxeo.runtime.api.Framework; 052import org.nuxeo.runtime.metrics.MetricsService; 053 054import com.codahale.metrics.MetricRegistry; 055import com.codahale.metrics.SharedMetricRegistries; 056import com.codahale.metrics.Timer; 057 058/** 059 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a> 060 */ 061@Name("esAdmin") 062@Scope(CONVERSATION) 063public class ElasticSearchManager implements Serializable { 064 065 private static final long serialVersionUID = 1L; 066 067 private static final Log log = LogFactory.getLog(ElasticSearchManager.class); 068 069 private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document"; 070 071 private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}"; 072 073 private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo"; 074 075 @In(create = true) 076 protected ElasticSearchAdmin esa; 077 078 @In(create = true) 079 protected ElasticSearchIndexing esi; 080 081 @In(create = true, required = false) 082 protected transient CoreSession documentManager; 083 084 protected List<PageProviderStatus> ppStatuses = null; 085 086 protected Timer indexTimer; 087 088 protected Timer bulkIndexTimer; 089 090 private String rootId; 091 092 private String nxql = DEFAULT_NXQL_QUERY; 093 094 private List<String> repositoryNames; 095 096 private String repositoryName; 097 098 private Boolean dropIndex = false; 099 100 public String getNodesInfo() { 101 NodesInfoResponse nodesInfo = esa.getClient().admin().cluster().prepareNodesInfo().execute().actionGet(); 102 return nodesInfo.toString(); 103 } 104 105 public String getNodesStats() { 106 NodesStatsResponse stats = esa.getClient().admin().cluster().prepareNodesStats().execute().actionGet(); 107 return stats.toString(); 108 } 109 110 public String getNodesHealth() { 111 String[] indices = getIndexNames(); 112 ClusterHealthResponse health = esa.getClient().admin().cluster().prepareHealth(indices).get(); 113 return health.toString(); 114 } 115 116 public void startReindexAll() { 117 String repositoryName = getRepositoryName(); 118 log.warn("Re-indexing the entire repository: " + repositoryName); 119 esa.dropAndInitRepositoryIndex(repositoryName); 120 esi.runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document"); 121 } 122 123 public void startReindexNxql() { 124 String repositoryName = getRepositoryName(); 125 log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName)); 126 esi.runReindexingWorker(repositoryName, getNxql()); 127 } 128 129 public void startReindexFrom() { 130 String repositoryName = getRepositoryName(); 131 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 132 log.warn(String.format("Try to remove %s and its children from %s repository index", rootId, 133 repositoryName)); 134 String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName); 135 IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd); 136 esi.indexNonRecursive(rmCmd); 137 138 DocumentRef ref = new IdRef(rootId); 139 if (session.exists(ref)) { 140 DocumentModel doc = session.getDocument(ref); 141 log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc, 142 repositoryName)); 143 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true); 144 esi.runIndexingWorker(Arrays.asList(cmd)); 145 } 146 } 147 } 148 149 public void flush() { 150 esa.flush(); 151 } 152 153 public void optimize() { 154 esa.optimize(); 155 } 156 157 protected void introspectPageProviders() { 158 159 ppStatuses = new ArrayList<>(); 160 161 PageProviderService pps = Framework.getLocalService(PageProviderService.class); 162 for (String ppName : pps.getPageProviderDefinitionNames()) { 163 PageProviderDefinition def = pps.getPageProviderDefinition(ppName); 164 // Create an instance so class replacer is taken in account 165 PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null); 166 String klass = pp.getClass().getCanonicalName(); 167 ppStatuses.add(new PageProviderStatus(ppName, klass)); 168 } 169 Collections.sort(ppStatuses); 170 } 171 172 public List<PageProviderStatus> getContentViewStatus() { 173 if (ppStatuses == null) { 174 introspectPageProviders(); 175 } 176 return ppStatuses; 177 } 178 179 public Boolean isIndexingInProgress() { 180 return esa.isIndexingInProgress(); 181 } 182 183 public Boolean displayClusterInfo() { 184 if (esa.isEmbedded()) { 185 return true; 186 } 187 return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false")); 188 } 189 190 public String getPendingWorkerCount() { 191 return Long.valueOf(esa.getPendingWorkerCount()).toString(); 192 } 193 194 public String getRunningWorkerCount() { 195 return Long.valueOf(esa.getRunningWorkerCount()).toString(); 196 } 197 198 public String getTotalCommandProcessed() { 199 return Integer.valueOf(esa.getTotalCommandProcessed()).toString(); 200 } 201 202 public String getNumberOfDocuments() { 203 String[] indices = getIndexNames(); 204 CountResponse ret = esa.getClient().prepareCount(indices).setQuery(QueryBuilders.matchAllQuery()).get(); 205 return Long.valueOf(ret.getCount()).toString(); 206 } 207 208 private String[] getIndexNames() { 209 List<String> repositoryNames = getRepositoryNames(); 210 String indices[] = new String[repositoryNames.size()]; 211 int i = 0; 212 for (String repo : repositoryNames) { 213 indices[i++] = esa.getIndexNameForRepository(repo); 214 } 215 return indices; 216 } 217 218 public String getIndexingRates() { 219 if (indexTimer == null) { 220 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 221 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 222 223 } 224 return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(), 225 indexTimer.getFifteenMinuteRate()); 226 } 227 228 public String getBulkIndexingRates() { 229 if (bulkIndexTimer == null) { 230 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 231 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 232 233 } 234 return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(), 235 bulkIndexTimer.getFifteenMinuteRate()); 236 } 237 238 public String getRootId() { 239 return rootId; 240 } 241 242 public List<String> getRepositoryNames() { 243 if (repositoryNames == null) { 244 repositoryNames = esa.getRepositoryNames(); 245 } 246 return repositoryNames; 247 } 248 249 public void setRootId(String rootId) { 250 this.rootId = rootId; 251 } 252 253 public String getNxql() { 254 return nxql; 255 } 256 257 public void setNxql(String nxql) { 258 this.nxql = nxql; 259 } 260 261 public String getRepositoryName() { 262 if (repositoryName == null) { 263 List<String> repositoryNames = getRepositoryNames(); 264 if (!repositoryNames.isEmpty()) { 265 repositoryName = repositoryNames.get(0); 266 } 267 } 268 return repositoryName; 269 } 270 271 public void setRepositoryName(String repositoryName) { 272 this.repositoryName = repositoryName; 273 } 274 275 public Boolean getDropIndex() { 276 return dropIndex; 277 } 278 279 public void setDropIndex(Boolean dropIndex) { 280 this.dropIndex = dropIndex; 281 } 282}