001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo
018 */
019package org.nuxeo.elasticsearch.web.admin;
020
021import static org.jboss.seam.ScopeType.EVENT;
022
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.List;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
031import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
032import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
033import org.elasticsearch.action.count.CountResponse;
034import org.elasticsearch.index.query.QueryBuilders;
035import org.jboss.seam.annotations.In;
036import org.jboss.seam.annotations.Name;
037import org.jboss.seam.annotations.Scope;
038import org.nuxeo.ecm.core.api.CoreInstance;
039import org.nuxeo.ecm.core.api.CoreSession;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.DocumentRef;
042import org.nuxeo.ecm.core.api.IdRef;
043import org.nuxeo.ecm.platform.query.api.PageProvider;
044import org.nuxeo.ecm.platform.query.api.PageProviderDefinition;
045import org.nuxeo.ecm.platform.query.api.PageProviderService;
046import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
047import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
048import org.nuxeo.elasticsearch.commands.IndexingCommand;
049import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
050import org.nuxeo.runtime.api.Framework;
051import org.nuxeo.runtime.metrics.MetricsService;
052
053import com.codahale.metrics.MetricRegistry;
054import com.codahale.metrics.SharedMetricRegistries;
055import com.codahale.metrics.Timer;
056
057/**
058 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a>
059 */
060@Name("esAdmin")
061@Scope(EVENT)
062public class ElasticSearchManager {
063
064    private static final Log log = LogFactory.getLog(ElasticSearchManager.class);
065
066    private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document";
067
068    private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}";
069
070    private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo";
071
072    @In(create = true)
073    protected ElasticSearchAdmin esa;
074
075    @In(create = true)
076    protected ElasticSearchIndexing esi;
077
078    @In(create = true, required = false)
079    protected transient CoreSession documentManager;
080
081    protected List<PageProviderStatus> ppStatuses = null;
082
083    protected Timer indexTimer;
084
085    protected Timer bulkIndexTimer;
086
087    private String rootId;
088
089    private String nxql = DEFAULT_NXQL_QUERY;
090
091    private String repositoryName;
092
093    private Boolean dropIndex = false;
094
095    public String getNodesInfo() {
096        NodesInfoResponse nodesInfo = esa.getClient().admin().cluster().prepareNodesInfo().execute().actionGet();
097        return nodesInfo.toString();
098    }
099
100    public String getNodesStats() {
101        NodesStatsResponse stats = esa.getClient().admin().cluster().prepareNodesStats().execute().actionGet();
102        return stats.toString();
103    }
104
105    public String getNodesHealth() {
106        String[] indices = getIndexNames();
107        ClusterHealthResponse health = esa.getClient().admin().cluster().prepareHealth(indices).get();
108        return health.toString();
109    }
110
111    public void startReindexAll() {
112        log.warn("Re-indexing the entire repository: " + repositoryName);
113        esa.dropAndInitRepositoryIndex(repositoryName);
114        esi.runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
115    }
116
117    public void startReindexNxql() {
118        log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName));
119        esi.runReindexingWorker(repositoryName, getNxql());
120    }
121
122    public void startReindexFrom() {
123        try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
124            log.warn(String.format("Try to remove %s and its children from %s repository index", rootId, repositoryName));
125            String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName);
126            IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd);
127            esi.indexNonRecursive(rmCmd);
128
129            DocumentRef ref = new IdRef(rootId);
130            if (session.exists(ref)) {
131                DocumentModel doc = session.getDocument(ref);
132                log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc,
133                        repositoryName));
134                IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true);
135                esi.runIndexingWorker(Arrays.asList(cmd));
136            }
137        }
138    }
139
140    public void flush() {
141        esa.flush();
142    }
143
144    public void optimize() {
145        esa.optimize();
146    }
147
148    protected void introspectPageProviders() {
149
150        ppStatuses = new ArrayList<>();
151
152        PageProviderService pps = Framework.getLocalService(PageProviderService.class);
153        for (String ppName : pps.getPageProviderDefinitionNames()) {
154            PageProviderDefinition def = pps.getPageProviderDefinition(ppName);
155            // Create an instance so class replacer is taken in account
156            PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null);
157            String klass = pp.getClass().getCanonicalName();
158            ppStatuses.add(new PageProviderStatus(ppName, klass));
159        }
160        Collections.sort(ppStatuses);
161    }
162
163    public List<PageProviderStatus> getContentViewStatus() {
164        if (ppStatuses == null) {
165            introspectPageProviders();
166        }
167        return ppStatuses;
168    }
169
170    public Boolean isIndexingInProgress() {
171        return esa.isIndexingInProgress();
172    }
173
174    public Boolean displayClusterInfo() {
175        if (esa.isEmbedded()) {
176            return true;
177        }
178        return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false"));
179    }
180
181    public String getPendingWorkerCount() {
182        return Long.valueOf(esa.getPendingWorkerCount()).toString();
183    }
184
185    public String getRunningWorkerCount() {
186        return Long.valueOf(esa.getRunningWorkerCount()).toString();
187    }
188
189    public String getTotalCommandProcessed() {
190        return Integer.valueOf(esa.getTotalCommandProcessed()).toString();
191    }
192
193    public String getNumberOfDocuments() {
194        String[] indices = getIndexNames();
195        CountResponse ret = esa.getClient().prepareCount(indices).setQuery(QueryBuilders.matchAllQuery()).get();
196        return Long.valueOf(ret.getCount()).toString();
197    }
198
199    private String[] getIndexNames() {
200        String indices[]  = new String[esa.getRepositoryNames().size()];
201        int i=0;
202        for (String repo : esa.getRepositoryNames()) {
203            indices[i++] = esa.getIndexNameForRepository(repo);
204        }
205        return indices;
206    }
207
208    public String getIndexingRates() {
209        if (indexTimer == null) {
210            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
211            indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
212
213        }
214        return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(),
215                indexTimer.getFifteenMinuteRate());
216    }
217
218    public String getBulkIndexingRates() {
219        if (bulkIndexTimer == null) {
220            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
221            bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
222
223        }
224        return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(),
225                bulkIndexTimer.getFifteenMinuteRate());
226    }
227
228    public String getRootId() {
229        return rootId;
230    }
231
232    public List<String> getRepositoryNames() {
233        return esa.getRepositoryNames();
234    }
235
236    public void setRootId(String rootId) {
237        this.rootId = rootId;
238    }
239
240    public String getNxql() {
241        return nxql;
242    }
243
244    public void setNxql(String nxql) {
245        this.nxql = nxql;
246    }
247
248    public String getRepositoryName() {
249        return repositoryName;
250    }
251
252    public void setRepositoryName(String repositoryName) {
253        this.repositoryName = repositoryName;
254    }
255
256    public Boolean getDropIndex() {
257        return dropIndex;
258    }
259
260    public void setDropIndex(Boolean dropIndex) {
261        this.dropIndex = dropIndex;
262    }
263}