001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo 018 */ 019package org.nuxeo.elasticsearch.web.admin; 020 021import com.codahale.metrics.MetricRegistry; 022import com.codahale.metrics.SharedMetricRegistries; 023import com.codahale.metrics.Timer; 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.elasticsearch.action.search.SearchRequest; 027import org.elasticsearch.action.search.SearchResponse; 028import org.elasticsearch.index.query.QueryBuilders; 029import org.elasticsearch.search.builder.SearchSourceBuilder; 030import org.jboss.seam.annotations.In; 031import org.jboss.seam.annotations.Name; 032import org.jboss.seam.annotations.Scope; 033import org.nuxeo.ecm.core.api.CoreInstance; 034import org.nuxeo.ecm.core.api.CoreSession; 035import org.nuxeo.ecm.core.api.DocumentModel; 036import org.nuxeo.ecm.core.api.DocumentRef; 037import org.nuxeo.ecm.core.api.IdRef; 038import org.nuxeo.ecm.platform.query.api.PageProvider; 039import org.nuxeo.ecm.platform.query.api.PageProviderDefinition; 040import org.nuxeo.ecm.platform.query.api.PageProviderService; 041import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 042import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 043import org.nuxeo.elasticsearch.commands.IndexingCommand; 044import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.metrics.MetricsService; 047 048import java.io.Serializable; 049import java.util.ArrayList; 050import java.util.Arrays; 051import java.util.Collections; 052import java.util.List; 053 054import static org.jboss.seam.ScopeType.CONVERSATION; 055 056/** 057 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a> 058 */ 059@Name("esAdmin") 060@Scope(CONVERSATION) 061public class ElasticSearchManager implements Serializable { 062 063 private static final long serialVersionUID = 1L; 064 065 private static final Log log = LogFactory.getLog(ElasticSearchManager.class); 066 067 private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document"; 068 069 private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}"; 070 071 private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo"; 072 073 @In(create = true) 074 protected ElasticSearchAdmin esa; 075 076 @In(create = true) 077 protected ElasticSearchIndexing esi; 078 079 @In(create = true, required = false) 080 protected transient CoreSession documentManager; 081 082 protected List<PageProviderStatus> ppStatuses = null; 083 084 protected Timer indexTimer; 085 086 protected Timer bulkIndexTimer; 087 088 private String rootId; 089 090 private String nxql = DEFAULT_NXQL_QUERY; 091 092 private List<String> repositoryNames; 093 094 private String repositoryName; 095 096 private Boolean dropIndex = false; 097 098 public String getNodesInfo() { 099 return esa.getClient().getNodesInfo(); 100 } 101 102 public String getNodesStats() { 103 return esa.getClient().getNodesStats(); 104 } 105 106 public String getNodesHealth() { 107 String[] indices = getIndexNames(); 108 return esa.getClient().getHealthStatus(indices).toString(); 109 } 110 111 public void startReindexAll() { 112 String repositoryName = getRepositoryName(); 113 log.warn("Re-indexing the entire repository: " + repositoryName); 114 esi.reindexRepository(repositoryName); 115 } 116 117 public void startReindexNxql() { 118 String repositoryName = getRepositoryName(); 119 log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName)); 120 esi.runReindexingWorker(repositoryName, getNxql()); 121 } 122 123 public void startReindexFrom() { 124 String repositoryName = getRepositoryName(); 125 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 126 log.warn(String.format("Try to remove %s and its children from %s repository index", rootId, 127 repositoryName)); 128 String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName); 129 IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd); 130 esi.indexNonRecursive(rmCmd); 131 132 DocumentRef ref = new IdRef(rootId); 133 if (session.exists(ref)) { 134 DocumentModel doc = session.getDocument(ref); 135 log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc, 136 repositoryName)); 137 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true); 138 esi.runIndexingWorker(Arrays.asList(cmd)); 139 } 140 } 141 } 142 143 public void flush() { 144 esa.flush(); 145 } 146 147 public void optimize() { 148 esa.optimize(); 149 } 150 151 protected void introspectPageProviders() { 152 153 ppStatuses = new ArrayList<>(); 154 155 PageProviderService pps = Framework.getService(PageProviderService.class); 156 for (String ppName : pps.getPageProviderDefinitionNames()) { 157 PageProviderDefinition def = pps.getPageProviderDefinition(ppName); 158 // Create an instance so class replacer is taken in account 159 PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null); 160 String klass = pp.getClass().getCanonicalName(); 161 ppStatuses.add(new PageProviderStatus(ppName, klass)); 162 } 163 Collections.sort(ppStatuses); 164 } 165 166 public List<PageProviderStatus> getContentViewStatus() { 167 if (ppStatuses == null) { 168 introspectPageProviders(); 169 } 170 return ppStatuses; 171 } 172 173 public Boolean isIndexingInProgress() { 174 return esa.isIndexingInProgress(); 175 } 176 177 public Boolean displayClusterInfo() { 178 if (esa.isEmbedded()) { 179 return true; 180 } 181 return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false")); 182 } 183 184 public String getPendingWorkerCount() { 185 return Long.valueOf(esa.getPendingWorkerCount()).toString(); 186 } 187 188 public String getRunningWorkerCount() { 189 return Long.valueOf(esa.getRunningWorkerCount()).toString(); 190 } 191 192 public String getTotalCommandProcessed() { 193 return Integer.valueOf(esa.getTotalCommandProcessed()).toString(); 194 } 195 196 public String getNumberOfDocuments() { 197 String[] indices = getIndexNames(); 198 SearchResponse ret = esa.getClient().search(new SearchRequest(indices) 199 .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(0))); 200 return Long.valueOf(ret.getHits().totalHits).toString(); 201 } 202 203 private String[] getIndexNames() { 204 List<String> repositoryNames = getRepositoryNames(); 205 String indices[] = new String[repositoryNames.size()]; 206 int i = 0; 207 for (String repo : repositoryNames) { 208 indices[i++] = esa.getIndexNameForRepository(repo); 209 } 210 return indices; 211 } 212 213 public String getIndexingRates() { 214 if (indexTimer == null) { 215 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 216 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 217 218 } 219 return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(), 220 indexTimer.getFifteenMinuteRate()); 221 } 222 223 public String getBulkIndexingRates() { 224 if (bulkIndexTimer == null) { 225 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 226 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 227 228 } 229 return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(), 230 bulkIndexTimer.getFifteenMinuteRate()); 231 } 232 233 public String getRootId() { 234 return rootId; 235 } 236 237 public List<String> getRepositoryNames() { 238 if (repositoryNames == null) { 239 repositoryNames = esa.getRepositoryNames(); 240 } 241 return repositoryNames; 242 } 243 244 public void setRootId(String rootId) { 245 this.rootId = rootId; 246 } 247 248 public String getNxql() { 249 return nxql; 250 } 251 252 public void setNxql(String nxql) { 253 this.nxql = nxql; 254 } 255 256 public String getRepositoryName() { 257 if (repositoryName == null) { 258 List<String> repositoryNames = getRepositoryNames(); 259 if (!repositoryNames.isEmpty()) { 260 repositoryName = repositoryNames.get(0); 261 } 262 } 263 return repositoryName; 264 } 265 266 public void setRepositoryName(String repositoryName) { 267 this.repositoryName = repositoryName; 268 } 269 270 public Boolean getDropIndex() { 271 return dropIndex; 272 } 273 274 public void setDropIndex(Boolean dropIndex) { 275 this.dropIndex = dropIndex; 276 } 277}