001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo
018 */
019package org.nuxeo.elasticsearch.web.admin;
020
021import com.codahale.metrics.MetricRegistry;
022import com.codahale.metrics.SharedMetricRegistries;
023import com.codahale.metrics.Timer;
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.elasticsearch.action.search.SearchRequest;
027import org.elasticsearch.action.search.SearchResponse;
028import org.elasticsearch.index.query.QueryBuilders;
029import org.elasticsearch.search.builder.SearchSourceBuilder;
030import org.jboss.seam.annotations.In;
031import org.jboss.seam.annotations.Name;
032import org.jboss.seam.annotations.Scope;
033import org.nuxeo.ecm.core.api.CloseableCoreSession;
034import org.nuxeo.ecm.core.api.CoreInstance;
035import org.nuxeo.ecm.core.api.CoreSession;
036import org.nuxeo.ecm.core.api.DocumentModel;
037import org.nuxeo.ecm.core.api.DocumentRef;
038import org.nuxeo.ecm.core.api.IdRef;
039import org.nuxeo.ecm.platform.query.api.PageProvider;
040import org.nuxeo.ecm.platform.query.api.PageProviderDefinition;
041import org.nuxeo.ecm.platform.query.api.PageProviderService;
042import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
043import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
044import org.nuxeo.elasticsearch.commands.IndexingCommand;
045import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.metrics.MetricsService;
048
049import java.io.Serializable;
050import java.util.ArrayList;
051import java.util.Arrays;
052import java.util.Collections;
053import java.util.List;
054
055import static org.jboss.seam.ScopeType.CONVERSATION;
056
057/**
058 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a>
059 */
060@Name("esAdmin")
061@Scope(CONVERSATION)
062public class ElasticSearchManager implements Serializable {
063
064    private static final long serialVersionUID = 1L;
065
066    private static final Log log = LogFactory.getLog(ElasticSearchManager.class);
067
068    private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document";
069
070    private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}";
071
072    private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo";
073
074    @In(create = true)
075    protected ElasticSearchAdmin esa;
076
077    @In(create = true)
078    protected ElasticSearchIndexing esi;
079
080    @In(create = true, required = false)
081    protected transient CoreSession documentManager;
082
083    protected List<PageProviderStatus> ppStatuses = null;
084
085    protected Timer indexTimer;
086
087    protected Timer bulkIndexTimer;
088
089    private String rootId;
090
091    private String nxql = DEFAULT_NXQL_QUERY;
092
093    private List<String> repositoryNames;
094
095    private String repositoryName;
096
097    private Boolean dropIndex = false;
098
099    public String getNodesInfo() {
100        return esa.getClient().getNodesInfo();
101    }
102
103    public String getNodesStats() {
104        return esa.getClient().getNodesStats();
105    }
106
107    public String getNodesHealth() {
108        String[] indices = getIndexNames();
109        return esa.getClient().getHealthStatus(indices).toString();
110    }
111
112    public void startReindexAll() {
113        String repositoryName = getRepositoryName();
114        log.warn("Re-indexing the entire repository: " + repositoryName);
115        esi.reindexRepository(repositoryName);
116    }
117
118    public void startReindexNxql() {
119        String repositoryName = getRepositoryName();
120        log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName));
121        esi.runReindexingWorker(repositoryName, getNxql());
122    }
123
124    public void startReindexFrom() {
125        String repositoryName = getRepositoryName();
126        try (CloseableCoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
127            log.warn(String.format("Try to remove %s and its children from %s repository index", rootId,
128                    repositoryName));
129            String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName);
130            IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd);
131            esi.indexNonRecursive(rmCmd);
132
133            DocumentRef ref = new IdRef(rootId);
134            if (session.exists(ref)) {
135                DocumentModel doc = session.getDocument(ref);
136                log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc,
137                        repositoryName));
138                IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true);
139                esi.runIndexingWorker(Arrays.asList(cmd));
140            }
141        }
142    }
143
144    public void flush() {
145        esa.flush();
146    }
147
148    public void optimize() {
149        esa.optimize();
150    }
151
152    protected void introspectPageProviders() {
153
154        ppStatuses = new ArrayList<>();
155
156        PageProviderService pps = Framework.getService(PageProviderService.class);
157        for (String ppName : pps.getPageProviderDefinitionNames()) {
158            PageProviderDefinition def = pps.getPageProviderDefinition(ppName);
159            // Create an instance so class replacer is taken in account
160            PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null);
161            String klass = pp.getClass().getCanonicalName();
162            ppStatuses.add(new PageProviderStatus(ppName, klass));
163        }
164        Collections.sort(ppStatuses);
165    }
166
167    public List<PageProviderStatus> getContentViewStatus() {
168        if (ppStatuses == null) {
169            introspectPageProviders();
170        }
171        return ppStatuses;
172    }
173
174    public Boolean isIndexingInProgress() {
175        return esa.isIndexingInProgress();
176    }
177
178    public Boolean displayClusterInfo() {
179        if (esa.isEmbedded()) {
180            return true;
181        }
182        return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false"));
183    }
184
185    public String getPendingWorkerCount() {
186        return Long.valueOf(esa.getPendingWorkerCount()).toString();
187    }
188
189    public String getRunningWorkerCount() {
190        return Long.valueOf(esa.getRunningWorkerCount()).toString();
191    }
192
193    public String getTotalCommandProcessed() {
194        return Integer.valueOf(esa.getTotalCommandProcessed()).toString();
195    }
196
197    public String getNumberOfDocuments() {
198        String[] indices = getIndexNames();
199        SearchResponse ret = esa.getClient().search(new SearchRequest(indices)
200                .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(0)));
201        return Long.valueOf(ret.getHits().totalHits).toString();
202    }
203
204    private String[] getIndexNames() {
205        List<String> repositoryNames = getRepositoryNames();
206        String indices[] = new String[repositoryNames.size()];
207        int i = 0;
208        for (String repo : repositoryNames) {
209            indices[i++] = esa.getIndexNameForRepository(repo);
210        }
211        return indices;
212    }
213
214    public String getIndexingRates() {
215        if (indexTimer == null) {
216            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
217            indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
218
219        }
220        return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(),
221                indexTimer.getFifteenMinuteRate());
222    }
223
224    public String getBulkIndexingRates() {
225        if (bulkIndexTimer == null) {
226            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
227            bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
228
229        }
230        return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(),
231                bulkIndexTimer.getFifteenMinuteRate());
232    }
233
234    public String getRootId() {
235        return rootId;
236    }
237
238    public List<String> getRepositoryNames() {
239        if (repositoryNames == null) {
240            repositoryNames = esa.getRepositoryNames();
241        }
242        return repositoryNames;
243    }
244
245    public void setRootId(String rootId) {
246        this.rootId = rootId;
247    }
248
249    public String getNxql() {
250        return nxql;
251    }
252
253    public void setNxql(String nxql) {
254        this.nxql = nxql;
255    }
256
257    public String getRepositoryName() {
258        if (repositoryName == null) {
259            List<String> repositoryNames = getRepositoryNames();
260            if (!repositoryNames.isEmpty()) {
261                repositoryName = repositoryNames.get(0);
262            }
263        }
264        return repositoryName;
265    }
266
267    public void setRepositoryName(String repositoryName) {
268        this.repositoryName = repositoryName;
269    }
270
271    public Boolean getDropIndex() {
272        return dropIndex;
273    }
274
275    public void setDropIndex(Boolean dropIndex) {
276        this.dropIndex = dropIndex;
277    }
278}