001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo
018 */
019package org.nuxeo.elasticsearch.web.admin;
020
021import com.codahale.metrics.MetricRegistry;
022import com.codahale.metrics.SharedMetricRegistries;
023import com.codahale.metrics.Timer;
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.elasticsearch.action.search.SearchRequest;
027import org.elasticsearch.action.search.SearchResponse;
028import org.elasticsearch.index.query.QueryBuilders;
029import org.elasticsearch.search.builder.SearchSourceBuilder;
030import org.jboss.seam.annotations.In;
031import org.jboss.seam.annotations.Name;
032import org.jboss.seam.annotations.Scope;
033import org.nuxeo.ecm.core.api.CoreInstance;
034import org.nuxeo.ecm.core.api.CoreSession;
035import org.nuxeo.ecm.core.api.DocumentModel;
036import org.nuxeo.ecm.core.api.DocumentRef;
037import org.nuxeo.ecm.core.api.IdRef;
038import org.nuxeo.ecm.platform.query.api.PageProvider;
039import org.nuxeo.ecm.platform.query.api.PageProviderDefinition;
040import org.nuxeo.ecm.platform.query.api.PageProviderService;
041import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
042import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
043import org.nuxeo.elasticsearch.commands.IndexingCommand;
044import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.metrics.MetricsService;
047
048import java.io.Serializable;
049import java.util.ArrayList;
050import java.util.Arrays;
051import java.util.Collections;
052import java.util.List;
053
054import static org.jboss.seam.ScopeType.CONVERSATION;
055
056/**
057 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a>
058 */
059@Name("esAdmin")
060@Scope(CONVERSATION)
061public class ElasticSearchManager implements Serializable {
062
063    private static final long serialVersionUID = 1L;
064
065    private static final Log log = LogFactory.getLog(ElasticSearchManager.class);
066
067    private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document";
068
069    private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}";
070
071    private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo";
072
073    @In(create = true)
074    protected ElasticSearchAdmin esa;
075
076    @In(create = true)
077    protected ElasticSearchIndexing esi;
078
079    @In(create = true, required = false)
080    protected transient CoreSession documentManager;
081
082    protected List<PageProviderStatus> ppStatuses = null;
083
084    protected Timer indexTimer;
085
086    protected Timer bulkIndexTimer;
087
088    private String rootId;
089
090    private String nxql = DEFAULT_NXQL_QUERY;
091
092    private List<String> repositoryNames;
093
094    private String repositoryName;
095
096    private Boolean dropIndex = false;
097
098    public String getNodesInfo() {
099        return esa.getClient().getNodesInfo();
100    }
101
102    public String getNodesStats() {
103        return esa.getClient().getNodesStats();
104    }
105
106    public String getNodesHealth() {
107        String[] indices = getIndexNames();
108        return esa.getClient().getHealthStatus(indices).toString();
109    }
110
111    public void startReindexAll() {
112        String repositoryName = getRepositoryName();
113        log.warn("Re-indexing the entire repository: " + repositoryName);
114        esi.reindexRepository(repositoryName);
115    }
116
117    public void startReindexNxql() {
118        String repositoryName = getRepositoryName();
119        log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName));
120        esi.runReindexingWorker(repositoryName, getNxql());
121    }
122
123    public void startReindexFrom() {
124        String repositoryName = getRepositoryName();
125        try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
126            log.warn(String.format("Try to remove %s and its children from %s repository index", rootId,
127                    repositoryName));
128            String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName);
129            IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd);
130            esi.indexNonRecursive(rmCmd);
131
132            DocumentRef ref = new IdRef(rootId);
133            if (session.exists(ref)) {
134                DocumentModel doc = session.getDocument(ref);
135                log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc,
136                        repositoryName));
137                IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true);
138                esi.runIndexingWorker(Arrays.asList(cmd));
139            }
140        }
141    }
142
143    public void flush() {
144        esa.flush();
145    }
146
147    public void optimize() {
148        esa.optimize();
149    }
150
151    protected void introspectPageProviders() {
152
153        ppStatuses = new ArrayList<>();
154
155        PageProviderService pps = Framework.getService(PageProviderService.class);
156        for (String ppName : pps.getPageProviderDefinitionNames()) {
157            PageProviderDefinition def = pps.getPageProviderDefinition(ppName);
158            // Create an instance so class replacer is taken in account
159            PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null);
160            String klass = pp.getClass().getCanonicalName();
161            ppStatuses.add(new PageProviderStatus(ppName, klass));
162        }
163        Collections.sort(ppStatuses);
164    }
165
166    public List<PageProviderStatus> getContentViewStatus() {
167        if (ppStatuses == null) {
168            introspectPageProviders();
169        }
170        return ppStatuses;
171    }
172
173    public Boolean isIndexingInProgress() {
174        return esa.isIndexingInProgress();
175    }
176
177    public Boolean displayClusterInfo() {
178        if (esa.isEmbedded()) {
179            return true;
180        }
181        return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false"));
182    }
183
184    public String getPendingWorkerCount() {
185        return Long.valueOf(esa.getPendingWorkerCount()).toString();
186    }
187
188    public String getRunningWorkerCount() {
189        return Long.valueOf(esa.getRunningWorkerCount()).toString();
190    }
191
192    public String getTotalCommandProcessed() {
193        return Integer.valueOf(esa.getTotalCommandProcessed()).toString();
194    }
195
196    public String getNumberOfDocuments() {
197        String[] indices = getIndexNames();
198        SearchResponse ret = esa.getClient().search(new SearchRequest(indices)
199                .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(0)));
200        return Long.valueOf(ret.getHits().totalHits).toString();
201    }
202
203    private String[] getIndexNames() {
204        List<String> repositoryNames = getRepositoryNames();
205        String indices[] = new String[repositoryNames.size()];
206        int i = 0;
207        for (String repo : repositoryNames) {
208            indices[i++] = esa.getIndexNameForRepository(repo);
209        }
210        return indices;
211    }
212
213    public String getIndexingRates() {
214        if (indexTimer == null) {
215            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
216            indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
217
218        }
219        return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(),
220                indexTimer.getFifteenMinuteRate());
221    }
222
223    public String getBulkIndexingRates() {
224        if (bulkIndexTimer == null) {
225            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
226            bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
227
228        }
229        return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(),
230                bulkIndexTimer.getFifteenMinuteRate());
231    }
232
233    public String getRootId() {
234        return rootId;
235    }
236
237    public List<String> getRepositoryNames() {
238        if (repositoryNames == null) {
239            repositoryNames = esa.getRepositoryNames();
240        }
241        return repositoryNames;
242    }
243
244    public void setRootId(String rootId) {
245        this.rootId = rootId;
246    }
247
248    public String getNxql() {
249        return nxql;
250    }
251
252    public void setNxql(String nxql) {
253        this.nxql = nxql;
254    }
255
256    public String getRepositoryName() {
257        if (repositoryName == null) {
258            List<String> repositoryNames = getRepositoryNames();
259            if (!repositoryNames.isEmpty()) {
260                repositoryName = repositoryNames.get(0);
261            }
262        }
263        return repositoryName;
264    }
265
266    public void setRepositoryName(String repositoryName) {
267        this.repositoryName = repositoryName;
268    }
269
270    public Boolean getDropIndex() {
271        return dropIndex;
272    }
273
274    public void setDropIndex(Boolean dropIndex) {
275        this.dropIndex = dropIndex;
276    }
277}