001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo
018 */
019package org.nuxeo.elasticsearch.web.admin;
020
021import static org.jboss.seam.ScopeType.CONVERSATION;
022
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.List;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
032import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
033import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
034import org.elasticsearch.action.count.CountResponse;
035import org.elasticsearch.index.query.QueryBuilders;
036import org.jboss.seam.annotations.In;
037import org.jboss.seam.annotations.Name;
038import org.jboss.seam.annotations.Scope;
039import org.nuxeo.ecm.core.api.CoreInstance;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.DocumentRef;
043import org.nuxeo.ecm.core.api.IdRef;
044import org.nuxeo.ecm.platform.query.api.PageProvider;
045import org.nuxeo.ecm.platform.query.api.PageProviderDefinition;
046import org.nuxeo.ecm.platform.query.api.PageProviderService;
047import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
048import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
049import org.nuxeo.elasticsearch.commands.IndexingCommand;
050import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
051import org.nuxeo.runtime.api.Framework;
052import org.nuxeo.runtime.metrics.MetricsService;
053
054import com.codahale.metrics.MetricRegistry;
055import com.codahale.metrics.SharedMetricRegistries;
056import com.codahale.metrics.Timer;
057
058/**
059 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a>
060 */
061@Name("esAdmin")
062@Scope(CONVERSATION)
063public class ElasticSearchManager implements Serializable {
064
065    private static final long serialVersionUID = 1L;
066
067    private static final Log log = LogFactory.getLog(ElasticSearchManager.class);
068
069    private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document";
070
071    private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}";
072
073    private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo";
074
075    @In(create = true)
076    protected ElasticSearchAdmin esa;
077
078    @In(create = true)
079    protected ElasticSearchIndexing esi;
080
081    @In(create = true, required = false)
082    protected transient CoreSession documentManager;
083
084    protected List<PageProviderStatus> ppStatuses = null;
085
086    protected Timer indexTimer;
087
088    protected Timer bulkIndexTimer;
089
090    private String rootId;
091
092    private String nxql = DEFAULT_NXQL_QUERY;
093
094    private List<String> repositoryNames;
095
096    private String repositoryName;
097
098    private Boolean dropIndex = false;
099
100    public String getNodesInfo() {
101        NodesInfoResponse nodesInfo = esa.getClient().admin().cluster().prepareNodesInfo().execute().actionGet();
102        return nodesInfo.toString();
103    }
104
105    public String getNodesStats() {
106        NodesStatsResponse stats = esa.getClient().admin().cluster().prepareNodesStats().execute().actionGet();
107        return stats.toString();
108    }
109
110    public String getNodesHealth() {
111        String[] indices = getIndexNames();
112        ClusterHealthResponse health = esa.getClient().admin().cluster().prepareHealth(indices).get();
113        return health.toString();
114    }
115
116    public void startReindexAll() {
117        String repositoryName = getRepositoryName();
118        log.warn("Re-indexing the entire repository: " + repositoryName);
119        esa.dropAndInitRepositoryIndex(repositoryName);
120        esi.runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
121    }
122
123    public void startReindexNxql() {
124        String repositoryName = getRepositoryName();
125        log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName));
126        esi.runReindexingWorker(repositoryName, getNxql());
127    }
128
129    public void startReindexFrom() {
130        String repositoryName = getRepositoryName();
131        try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
132            log.warn(String.format("Try to remove %s and its children from %s repository index", rootId,
133                    repositoryName));
134            String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName);
135            IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd);
136            esi.indexNonRecursive(rmCmd);
137
138            DocumentRef ref = new IdRef(rootId);
139            if (session.exists(ref)) {
140                DocumentModel doc = session.getDocument(ref);
141                log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc,
142                        repositoryName));
143                IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true);
144                esi.runIndexingWorker(Arrays.asList(cmd));
145            }
146        }
147    }
148
149    public void flush() {
150        esa.flush();
151    }
152
153    public void optimize() {
154        esa.optimize();
155    }
156
157    protected void introspectPageProviders() {
158
159        ppStatuses = new ArrayList<>();
160
161        PageProviderService pps = Framework.getLocalService(PageProviderService.class);
162        for (String ppName : pps.getPageProviderDefinitionNames()) {
163            PageProviderDefinition def = pps.getPageProviderDefinition(ppName);
164            // Create an instance so class replacer is taken in account
165            PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null);
166            String klass = pp.getClass().getCanonicalName();
167            ppStatuses.add(new PageProviderStatus(ppName, klass));
168        }
169        Collections.sort(ppStatuses);
170    }
171
172    public List<PageProviderStatus> getContentViewStatus() {
173        if (ppStatuses == null) {
174            introspectPageProviders();
175        }
176        return ppStatuses;
177    }
178
179    public Boolean isIndexingInProgress() {
180        return esa.isIndexingInProgress();
181    }
182
183    public Boolean displayClusterInfo() {
184        if (esa.isEmbedded()) {
185            return true;
186        }
187        return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false"));
188    }
189
190    public String getPendingWorkerCount() {
191        return Long.valueOf(esa.getPendingWorkerCount()).toString();
192    }
193
194    public String getRunningWorkerCount() {
195        return Long.valueOf(esa.getRunningWorkerCount()).toString();
196    }
197
198    public String getTotalCommandProcessed() {
199        return Integer.valueOf(esa.getTotalCommandProcessed()).toString();
200    }
201
202    public String getNumberOfDocuments() {
203        String[] indices = getIndexNames();
204        CountResponse ret = esa.getClient().prepareCount(indices).setQuery(QueryBuilders.matchAllQuery()).get();
205        return Long.valueOf(ret.getCount()).toString();
206    }
207
208    private String[] getIndexNames() {
209        List<String> repositoryNames = getRepositoryNames();
210        String indices[] = new String[repositoryNames.size()];
211        int i = 0;
212        for (String repo : repositoryNames) {
213            indices[i++] = esa.getIndexNameForRepository(repo);
214        }
215        return indices;
216    }
217
218    public String getIndexingRates() {
219        if (indexTimer == null) {
220            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
221            indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
222
223        }
224        return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(),
225                indexTimer.getFifteenMinuteRate());
226    }
227
228    public String getBulkIndexingRates() {
229        if (bulkIndexTimer == null) {
230            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
231            bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
232
233        }
234        return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(),
235                bulkIndexTimer.getFifteenMinuteRate());
236    }
237
238    public String getRootId() {
239        return rootId;
240    }
241
242    public List<String> getRepositoryNames() {
243        if (repositoryNames == null) {
244            repositoryNames = esa.getRepositoryNames();
245        }
246        return repositoryNames;
247    }
248
249    public void setRootId(String rootId) {
250        this.rootId = rootId;
251    }
252
253    public String getNxql() {
254        return nxql;
255    }
256
257    public void setNxql(String nxql) {
258        this.nxql = nxql;
259    }
260
261    public String getRepositoryName() {
262        if (repositoryName == null) {
263            List<String> repositoryNames = getRepositoryNames();
264            if (!repositoryNames.isEmpty()) {
265                repositoryName = repositoryNames.get(0);
266            }
267        }
268        return repositoryName;
269    }
270
271    public void setRepositoryName(String repositoryName) {
272        this.repositoryName = repositoryName;
273    }
274
275    public Boolean getDropIndex() {
276        return dropIndex;
277    }
278
279    public void setDropIndex(Boolean dropIndex) {
280        this.dropIndex = dropIndex;
281    }
282}