001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Nuxeo
016 */
017package org.nuxeo.elasticsearch.web.admin;
018
019import static org.jboss.seam.ScopeType.EVENT;
020
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.List;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
029import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
030import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
031import org.elasticsearch.action.count.CountResponse;
032import org.elasticsearch.index.query.QueryBuilders;
033import org.jboss.seam.annotations.In;
034import org.jboss.seam.annotations.Name;
035import org.jboss.seam.annotations.Scope;
036import org.nuxeo.ecm.core.api.CoreInstance;
037import org.nuxeo.ecm.core.api.CoreSession;
038import org.nuxeo.ecm.core.api.DocumentModel;
039import org.nuxeo.ecm.core.api.DocumentRef;
040import org.nuxeo.ecm.core.api.IdRef;
041import org.nuxeo.ecm.platform.query.api.PageProvider;
042import org.nuxeo.ecm.platform.query.api.PageProviderDefinition;
043import org.nuxeo.ecm.platform.query.api.PageProviderService;
044import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
045import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
046import org.nuxeo.elasticsearch.commands.IndexingCommand;
047import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
048import org.nuxeo.runtime.api.Framework;
049import org.nuxeo.runtime.metrics.MetricsService;
050
051import com.codahale.metrics.MetricRegistry;
052import com.codahale.metrics.SharedMetricRegistries;
053import com.codahale.metrics.Timer;
054
055/**
056 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a>
057 */
058@Name("esAdmin")
059@Scope(EVENT)
060public class ElasticSearchManager {
061
062    private static final Log log = LogFactory.getLog(ElasticSearchManager.class);
063
064    private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document";
065
066    private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}";
067
068    private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo";
069
070    @In(create = true)
071    protected ElasticSearchAdmin esa;
072
073    @In(create = true)
074    protected ElasticSearchIndexing esi;
075
076    @In(create = true, required = false)
077    protected transient CoreSession documentManager;
078
079    protected List<PageProviderStatus> ppStatuses = null;
080
081    protected Timer indexTimer;
082
083    protected Timer bulkIndexTimer;
084
085    private String rootId;
086
087    private String nxql = DEFAULT_NXQL_QUERY;
088
089    private String repositoryName;
090
091    private Boolean dropIndex = false;
092
093    public String getNodesInfo() {
094        NodesInfoResponse nodesInfo = esa.getClient().admin().cluster().prepareNodesInfo().execute().actionGet();
095        return nodesInfo.toString();
096    }
097
098    public String getNodesStats() {
099        NodesStatsResponse stats = esa.getClient().admin().cluster().prepareNodesStats().execute().actionGet();
100        return stats.toString();
101    }
102
103    public String getNodesHealth() {
104        String[] indices = getIndexNames();
105        ClusterHealthResponse health = esa.getClient().admin().cluster().prepareHealth(indices).get();
106        return health.toString();
107    }
108
109    public void startReindexAll() {
110        log.warn("Re-indexing the entire repository: " + repositoryName);
111        esa.dropAndInitRepositoryIndex(repositoryName);
112        esi.runReindexingWorker(repositoryName, "SELECT ecm:uuid FROM Document");
113    }
114
115    public void startReindexNxql() {
116        log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName));
117        esi.runReindexingWorker(repositoryName, getNxql());
118    }
119
120    public void startReindexFrom() {
121        try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
122            log.warn(String.format("Try to remove %s and its children from %s repository index", rootId, repositoryName));
123            String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName);
124            IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd);
125            esi.indexNonRecursive(rmCmd);
126
127            DocumentRef ref = new IdRef(rootId);
128            if (session.exists(ref)) {
129                DocumentModel doc = session.getDocument(ref);
130                log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc,
131                        repositoryName));
132                IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true);
133                esi.runIndexingWorker(Arrays.asList(cmd));
134            }
135        }
136    }
137
138    public void flush() {
139        esa.flush();
140    }
141
142    public void optimize() {
143        esa.optimize();
144    }
145
146    protected void introspectPageProviders() {
147
148        ppStatuses = new ArrayList<>();
149
150        PageProviderService pps = Framework.getLocalService(PageProviderService.class);
151        for (String ppName : pps.getPageProviderDefinitionNames()) {
152            PageProviderDefinition def = pps.getPageProviderDefinition(ppName);
153            // Create an instance so class replacer is taken in account
154            PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null);
155            String klass = pp.getClass().getCanonicalName();
156            ppStatuses.add(new PageProviderStatus(ppName, klass));
157        }
158        Collections.sort(ppStatuses);
159    }
160
161    public List<PageProviderStatus> getContentViewStatus() {
162        if (ppStatuses == null) {
163            introspectPageProviders();
164        }
165        return ppStatuses;
166    }
167
168    public Boolean isIndexingInProgress() {
169        return esa.isIndexingInProgress();
170    }
171
172    public Boolean displayClusterInfo() {
173        if (esa.isEmbedded()) {
174            return true;
175        }
176        return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false"));
177    }
178
179    public String getPendingWorkerCount() {
180        return Integer.valueOf(esa.getPendingWorkerCount()).toString();
181    }
182
183    public String getRunningWorkerCount() {
184        return Integer.valueOf(esa.getRunningWorkerCount()).toString();
185    }
186
187    public String getTotalCommandProcessed() {
188        return Integer.valueOf(esa.getTotalCommandProcessed()).toString();
189    }
190
191    public String getNumberOfDocuments() {
192        String[] indices = getIndexNames();
193        CountResponse ret = esa.getClient().prepareCount(indices).setQuery(QueryBuilders.matchAllQuery()).get();
194        return Long.valueOf(ret.getCount()).toString();
195    }
196
197    private String[] getIndexNames() {
198        String indices[]  = new String[esa.getRepositoryNames().size()];
199        int i=0;
200        for (String repo : esa.getRepositoryNames()) {
201            indices[i++] = esa.getIndexNameForRepository(repo);
202        }
203        return indices;
204    }
205
206    public String getIndexingRates() {
207        if (indexTimer == null) {
208            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
209            indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index"));
210
211        }
212        return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(),
213                indexTimer.getFifteenMinuteRate());
214    }
215
216    public String getBulkIndexingRates() {
217        if (bulkIndexTimer == null) {
218            MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
219            bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex"));
220
221        }
222        return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(),
223                bulkIndexTimer.getFifteenMinuteRate());
224    }
225
226    public String getRootId() {
227        return rootId;
228    }
229
230    public List<String> getRepositoryNames() {
231        return esa.getRepositoryNames();
232    }
233
234    public void setRootId(String rootId) {
235        this.rootId = rootId;
236    }
237
238    public String getNxql() {
239        return nxql;
240    }
241
242    public void setNxql(String nxql) {
243        this.nxql = nxql;
244    }
245
246    public String getRepositoryName() {
247        return repositoryName;
248    }
249
250    public void setRepositoryName(String repositoryName) {
251        this.repositoryName = repositoryName;
252    }
253
254    public Boolean getDropIndex() {
255        return dropIndex;
256    }
257
258    public void setDropIndex(Boolean dropIndex) {
259        this.dropIndex = dropIndex;
260    }
261}