001/*
002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import java.util.List;
024
025import org.apache.commons.lang3.StringUtils;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.action.search.ClearScrollRequest;
029import org.elasticsearch.action.search.SearchRequest;
030import org.elasticsearch.action.search.SearchResponse;
031import org.elasticsearch.action.search.SearchScrollRequest;
032import org.elasticsearch.action.search.SearchType;
033import org.elasticsearch.common.unit.TimeValue;
034import org.elasticsearch.index.query.QueryBuilder;
035import org.elasticsearch.search.aggregations.Aggregation;
036import org.elasticsearch.search.aggregations.bucket.filter.Filter;
037import org.elasticsearch.search.builder.SearchSourceBuilder;
038import org.nuxeo.ecm.core.api.CoreSession;
039import org.nuxeo.ecm.core.api.DocumentModelList;
040import org.nuxeo.ecm.core.api.IterableQueryResult;
041import org.nuxeo.ecm.core.api.SortInfo;
042import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
043import org.nuxeo.ecm.platform.query.api.Aggregate;
044import org.nuxeo.ecm.platform.query.api.Bucket;
045import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
046import org.nuxeo.elasticsearch.api.ElasticSearchService;
047import org.nuxeo.elasticsearch.api.EsResult;
048import org.nuxeo.elasticsearch.api.EsScrollResult;
049import org.nuxeo.elasticsearch.fetcher.Fetcher;
050import org.nuxeo.elasticsearch.query.NxQueryBuilder;
051import org.nuxeo.runtime.api.Framework;
052import org.nuxeo.runtime.metrics.MetricsService;
053
054import io.dropwizard.metrics5.MetricName;
055import io.dropwizard.metrics5.MetricRegistry;
056import io.dropwizard.metrics5.SharedMetricRegistries;
057import io.dropwizard.metrics5.Timer;
058import io.dropwizard.metrics5.Timer.Context;
059
060/**
061 * @since 6.0
062 */
063public class ElasticSearchServiceImpl implements ElasticSearchService {
064    private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class);
065
066    private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms";
067
068    private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(
069            Framework.getProperty(LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000;
070
071    // Metrics
072    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
073
074    protected final Timer searchTimer;
075
076    protected final Timer scrollTimer;
077
078    protected final Timer fetchTimer;
079
080    private final ElasticSearchAdminImpl esa;
081
082    public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) {
083        this.esa = esa;
084        searchTimer = registry.timer(MetricName.build("nuxeo.elasticsearch.service.timer").tagged("service", "search"));
085        scrollTimer = registry.timer(MetricName.build("nuxeo.elasticsearch.service.timer").tagged("service", "scroll"));
086        fetchTimer = registry.timer(MetricName.build("nuxeo.elasticsearch.service.timer").tagged("service", "fetch"));
087    }
088
089    @Deprecated
090    @Override
091    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
092        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
093        return query(query);
094    }
095
096    @Deprecated
097    @Override
098    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
099            SortInfo... sortInfos) {
100        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder)
101                                                          .limit(limit)
102                                                          .offset(offset)
103                                                          .addSort(sortInfos);
104        return query(query);
105    }
106
107    @Override
108    public DocumentModelList query(NxQueryBuilder queryBuilder) {
109        return queryAndAggregate(queryBuilder).getDocuments();
110    }
111
112    @SuppressWarnings("resource") // IterableQueryResult closed by EsResult.getRows().close()
113    @Override
114    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
115        SearchResponse response = search(queryBuilder);
116        List<Aggregate<Bucket>> aggs = getAggregates(queryBuilder, response);
117        if (queryBuilder.returnsDocuments()) {
118            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
119            return new EsResult(docs, aggs, response);
120        } else if (queryBuilder.returnsRows()) {
121            IterableQueryResult rows = getRows(queryBuilder, response);
122            return new EsResult(rows, aggs, response);
123        }
124        return new EsResult(response);
125    }
126
127    @Override
128    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
129        return scroll(queryBuilder, SearchType.DFS_QUERY_THEN_FETCH, keepAlive);
130    }
131
132    protected EsScrollResult scroll(NxQueryBuilder queryBuilder, SearchType searchType, long keepAlive) {
133        SearchResponse response = searchScroll(queryBuilder, searchType, keepAlive);
134        return getScrollResults(queryBuilder, response, response.getScrollId(), keepAlive);
135    }
136
137    @Override
138    public EsScrollResult scroll(EsScrollResult scrollResult) {
139        SearchResponse response = nextScroll(scrollResult.getScrollId(), scrollResult.getKeepAlive());
140        return getScrollResults(scrollResult.getQueryBuilder(), response, response.getScrollId(),
141                scrollResult.getKeepAlive());
142    }
143
144    @Override
145    public void clearScroll(EsScrollResult scrollResult) {
146        clearScroll(scrollResult.getScrollId());
147    }
148
149    protected void clearScroll(String scrollId) {
150        if (log.isDebugEnabled()) {
151            log.debug(String.format(
152                    "Clear scroll : curl -XDELETE 'http://localhost:9200/_search/scroll' -d '{\"scroll_id\" : [\"%s\"]}'",
153                    scrollId));
154        }
155        ClearScrollRequest request = new ClearScrollRequest();
156        request.addScrollId(scrollId);
157        esa.getClient().clearScroll(request);
158    }
159
160    @SuppressWarnings("resource") // IterableQueryResult closed by EsResult.getRows().close()
161    protected EsScrollResult getScrollResults(NxQueryBuilder queryBuilder, SearchResponse response, String scrollId,
162            long keepAlive) {
163        if (queryBuilder.returnsDocuments()) {
164            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
165            return new EsScrollResult(docs, response, queryBuilder, scrollId, keepAlive);
166        } else if (queryBuilder.returnsRows()) {
167            IterableQueryResult rows = getRows(queryBuilder, response);
168            return new EsScrollResult(rows, response, queryBuilder, scrollId, keepAlive);
169        }
170        return new EsScrollResult(response, queryBuilder, scrollId, keepAlive);
171    }
172
173    protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) {
174        DocumentModelListImpl ret;
175        long totalSize = response.getHits().getTotalHits().value;
176        if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) {
177            ret = new DocumentModelListImpl(0);
178            ret.setTotalSize(totalSize);
179            return ret;
180        }
181        try (Context stopWatch = fetchTimer.time()) {
182            Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap());
183            ret = fetcher.fetchDocuments();
184            logMinDurationFetch(stopWatch.stop(), totalSize);
185        }
186        ret.setTotalSize(totalSize);
187        return ret;
188    }
189
190    private void logMinDurationFetch(long duration, long totalSize) {
191        if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) {
192            String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0,
193                    totalSize);
194            if (log.isTraceEnabled()) {
195                log.trace(msg, new Throwable("Slow fetch document stack trace"));
196            } else {
197                log.debug(msg);
198            }
199        }
200    }
201
202    protected List<Aggregate<Bucket>> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) {
203        for (AggregateEsBase<Aggregation, Bucket> agg : queryBuilder.getAggregates()) {
204            Filter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg));
205            if (filter == null) {
206                continue;
207            }
208            Aggregation aggregation = filter.getAggregations().get(agg.getId());
209            if (aggregation == null) {
210                continue;
211            }
212            agg.parseAggregation(aggregation);
213        }
214        @SuppressWarnings("unchecked")
215        List<Aggregate<Bucket>> ret = (List<Aggregate<Bucket>>) (List<?>) queryBuilder.getAggregates();
216        return ret;
217    }
218
219    private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) {
220        return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes());
221    }
222
223    protected SearchResponse search(NxQueryBuilder query) {
224        try (Context ignored = searchTimer.time()) {
225            SearchType searchType = SearchType.DFS_QUERY_THEN_FETCH;
226            SearchRequest request = buildEsSearchRequest(query, searchType);
227            logSearchRequest(request, query, searchType);
228            SearchResponse response = esa.getClient().search(request);
229            logSearchResponse(response);
230            return response;
231        }
232    }
233
234    protected SearchResponse searchScroll(NxQueryBuilder query, SearchType searchType, long keepAlive) {
235        try (Context ignored = searchTimer.time()) {
236            SearchRequest request = buildEsSearchScrollRequest(query, searchType, keepAlive);
237            logSearchRequest(request, query, searchType);
238            SearchResponse response = esa.getClient().search(request);
239            logSearchResponse(response);
240            return response;
241        }
242    }
243
244    protected SearchResponse nextScroll(String scrollId, long keepAlive) {
245        try (Context ignored = scrollTimer.time()) {
246            SearchScrollRequest request = buildEsScrollRequest(scrollId, keepAlive);
247            logScrollRequest(scrollId, keepAlive);
248            SearchResponse response = esa.getClient().searchScroll(request);
249            logSearchResponse(response);
250            return response;
251        }
252    }
253
254    protected SearchRequest buildEsSearchRequest(NxQueryBuilder query, SearchType searchType) {
255        SearchRequest request = new SearchRequest(esa.getSearchIndexes(query.getSearchRepositories()));
256        request.searchType(searchType);
257        SearchSourceBuilder search = new SearchSourceBuilder();
258        query.updateRequest(search);
259        request.source(search);
260        if (query.isFetchFromElasticsearch()) {
261            // fetch the _source without the binaryfulltext field
262            search.fetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields());
263        }
264        return request;
265    }
266
267    protected SearchRequest buildEsSearchScrollRequest(NxQueryBuilder query, SearchType searchType, long keepAlive) {
268        SearchRequest request = buildEsSearchRequest(query, searchType);
269        request.scroll(new TimeValue(keepAlive));
270        return request;
271    }
272
273    protected SearchScrollRequest buildEsScrollRequest(String scrollId, long keepAlive) {
274        return new SearchScrollRequest(scrollId).scroll(new TimeValue(keepAlive));
275    }
276
277    protected void logSearchResponse(SearchResponse response) {
278        if (log.isDebugEnabled()) {
279            log.debug("Response: " + response.toString());
280        }
281    }
282
283    protected void logSearchRequest(SearchRequest request, NxQueryBuilder query, SearchType searchType) {
284        if (log.isDebugEnabled()) {
285            String scroll = request.scroll() != null ? "&scroll=" + request.scroll() : "";
286            log.debug(String.format(
287                    "Search query: curl -XGET 'http://localhost:9200/%s/_search?pretty&search_type=%s%s' -d '%s'",
288                    getSearchIndexesAsString(query), searchType.toString().toLowerCase(), scroll,
289                    request.source().toString()));
290        }
291    }
292
293    protected void logScrollRequest(String scrollId, long keepAlive) {
294        if (log.isDebugEnabled()) {
295            log.debug(String.format(
296                    "Scroll search: curl -XGET 'http://localhost:9200/_search/scroll?pretty' -d '{\"scroll\" : \"%d\", \"scroll_id\" : \"%s\"}'",
297                    keepAlive, scrollId));
298        }
299    }
300
301    protected String getSearchIndexesAsString(NxQueryBuilder query) {
302        return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ',');
303    }
304
305}