001/*
002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
024
025import java.util.List;
026
027import org.apache.commons.lang3.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.ClearScrollRequest;
031import org.elasticsearch.action.search.SearchRequest;
032import org.elasticsearch.action.search.SearchResponse;
033import org.elasticsearch.action.search.SearchScrollRequest;
034import org.elasticsearch.action.search.SearchType;
035import org.elasticsearch.common.unit.TimeValue;
036import org.elasticsearch.index.query.QueryBuilder;
037import org.elasticsearch.search.aggregations.Aggregation;
038import org.elasticsearch.search.aggregations.bucket.filter.Filter;
039import org.elasticsearch.search.builder.SearchSourceBuilder;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModelList;
042import org.nuxeo.ecm.core.api.IterableQueryResult;
043import org.nuxeo.ecm.core.api.SortInfo;
044import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
045import org.nuxeo.ecm.platform.query.api.Aggregate;
046import org.nuxeo.ecm.platform.query.api.Bucket;
047import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
048import org.nuxeo.elasticsearch.api.ElasticSearchService;
049import org.nuxeo.elasticsearch.api.EsResult;
050import org.nuxeo.elasticsearch.api.EsScrollResult;
051import org.nuxeo.elasticsearch.fetcher.Fetcher;
052import org.nuxeo.elasticsearch.query.NxQueryBuilder;
053import org.nuxeo.runtime.api.Framework;
054import org.nuxeo.runtime.metrics.MetricsService;
055
056import com.codahale.metrics.MetricRegistry;
057import com.codahale.metrics.SharedMetricRegistries;
058import com.codahale.metrics.Timer;
059import com.codahale.metrics.Timer.Context;
060
061/**
062 * @since 6.0
063 */
064public class ElasticSearchServiceImpl implements ElasticSearchService {
065    private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class);
066
067    private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms";
068
069    private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(
070            Framework.getProperty(LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000;
071
072    // Metrics
073    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
074
075    protected final Timer searchTimer;
076
077    protected final Timer scrollTimer;
078
079    protected final Timer fetchTimer;
080
081    private final ElasticSearchAdminImpl esa;
082
083    public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) {
084        this.esa = esa;
085        searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search"));
086        scrollTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "scroll"));
087        fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch"));
088    }
089
090    @Deprecated
091    @Override
092    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
093        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
094        return query(query);
095    }
096
097    @Deprecated
098    @Override
099    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
100            SortInfo... sortInfos) {
101        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder)
102                                                          .limit(limit)
103                                                          .offset(offset)
104                                                          .addSort(sortInfos);
105        return query(query);
106    }
107
108    @Override
109    public DocumentModelList query(NxQueryBuilder queryBuilder) {
110        return queryAndAggregate(queryBuilder).getDocuments();
111    }
112
113    @Override
114    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
115        SearchResponse response = search(queryBuilder);
116        List<Aggregate<Bucket>> aggs = getAggregates(queryBuilder, response);
117        if (queryBuilder.returnsDocuments()) {
118            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
119            return new EsResult(docs, aggs, response);
120        } else if (queryBuilder.returnsRows()) {
121            IterableQueryResult rows = getRows(queryBuilder, response);
122            return new EsResult(rows, aggs, response);
123        }
124        return new EsResult(response);
125    }
126
127    @Override
128    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
129        return scroll(queryBuilder, SearchType.DFS_QUERY_THEN_FETCH, keepAlive);
130    }
131
132    protected EsScrollResult scroll(NxQueryBuilder queryBuilder, SearchType searchType, long keepAlive) {
133        SearchResponse response = searchScroll(queryBuilder, searchType, keepAlive);
134        return getScrollResults(queryBuilder, response, response.getScrollId(), keepAlive);
135    }
136
137    @Override
138    public EsScrollResult scroll(EsScrollResult scrollResult) {
139        SearchResponse response = nextScroll(scrollResult.getScrollId(), scrollResult.getKeepAlive());
140        return getScrollResults(scrollResult.getQueryBuilder(), response, response.getScrollId(),
141                scrollResult.getKeepAlive());
142    }
143
144    @Override
145    public void clearScroll(EsScrollResult scrollResult) {
146        clearScroll(scrollResult.getScrollId());
147    }
148
149    protected void clearScroll(String scrollId) {
150        if (log.isDebugEnabled()) {
151            log.debug(String.format(
152                    "Clear scroll : curl -XDELETE 'http://localhost:9200/_search/scroll' -d '{\"scroll_id\" : [\"%s\"]}'",
153                    scrollId));
154        }
155        ClearScrollRequest request = new ClearScrollRequest();
156        request.addScrollId(scrollId);
157        esa.getClient().clearScroll(request);
158    }
159
160    protected EsScrollResult getScrollResults(NxQueryBuilder queryBuilder, SearchResponse response, String scrollId,
161            long keepAlive) {
162        if (queryBuilder.returnsDocuments()) {
163            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
164            return new EsScrollResult(docs, response, queryBuilder, scrollId, keepAlive);
165        } else if (queryBuilder.returnsRows()) {
166            IterableQueryResult rows = getRows(queryBuilder, response);
167            return new EsScrollResult(rows, response, queryBuilder, scrollId, keepAlive);
168        }
169        return new EsScrollResult(response, queryBuilder, scrollId, keepAlive);
170    }
171
172    protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) {
173        DocumentModelListImpl ret;
174        long totalSize = response.getHits().getTotalHits();
175        if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) {
176            ret = new DocumentModelListImpl(0);
177            ret.setTotalSize(totalSize);
178            return ret;
179        }
180        try (Context stopWatch = fetchTimer.time()) {
181            Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap());
182            ret = fetcher.fetchDocuments();
183            logMinDurationFetch(stopWatch.stop(), totalSize);
184        }
185        ret.setTotalSize(totalSize);
186        return ret;
187    }
188
189    private void logMinDurationFetch(long duration, long totalSize) {
190        if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) {
191            String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0,
192                    totalSize);
193            if (log.isTraceEnabled()) {
194                log.trace(msg, new Throwable("Slow fetch document stack trace"));
195            } else {
196                log.debug(msg);
197            }
198        }
199    }
200
201    protected List<Aggregate<Bucket>> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) {
202        for (AggregateEsBase<Aggregation, Bucket> agg : queryBuilder.getAggregates()) {
203            Filter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg));
204            if (filter == null) {
205                continue;
206            }
207            Aggregation aggregation = filter.getAggregations().get(agg.getId());
208            if (aggregation == null) {
209                continue;
210            }
211            agg.parseAggregation(aggregation);
212        }
213        @SuppressWarnings("unchecked")
214        List<Aggregate<Bucket>> ret = (List<Aggregate<Bucket>>) (List<?>) queryBuilder.getAggregates();
215        return ret;
216    }
217
218    private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) {
219        return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes());
220    }
221
222    protected SearchResponse search(NxQueryBuilder query) {
223        try (Context ignored = searchTimer.time()) {
224            SearchType searchType = SearchType.DFS_QUERY_THEN_FETCH;
225            SearchRequest request = buildEsSearchRequest(query, searchType);
226            logSearchRequest(request, query, searchType);
227            SearchResponse response = esa.getClient().search(request);
228            logSearchResponse(response);
229            return response;
230        }
231    }
232
233    protected SearchResponse searchScroll(NxQueryBuilder query, SearchType searchType, long keepAlive) {
234        try (Context ignored = searchTimer.time()) {
235            SearchRequest request = buildEsSearchScrollRequest(query, searchType, keepAlive);
236            logSearchRequest(request, query, searchType);
237            SearchResponse response = esa.getClient().search(request);
238            logSearchResponse(response);
239            return response;
240        }
241    }
242
243    protected SearchResponse nextScroll(String scrollId, long keepAlive) {
244        try (Context ignored = scrollTimer.time()) {
245            SearchScrollRequest request = buildEsScrollRequest(scrollId, keepAlive);
246            logScrollRequest(scrollId, keepAlive);
247            SearchResponse response = esa.getClient().searchScroll(request);
248            logSearchResponse(response);
249            return response;
250        }
251    }
252
253    protected SearchRequest buildEsSearchRequest(NxQueryBuilder query, SearchType searchType) {
254        SearchRequest request = new SearchRequest(esa.getSearchIndexes(query.getSearchRepositories()));
255        request.searchType(searchType);
256        SearchSourceBuilder search = new SearchSourceBuilder();
257        query.updateRequest(search);
258        request.source(search);
259        if (query.isFetchFromElasticsearch()) {
260            // fetch the _source without the binaryfulltext field
261            search.fetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields());
262        }
263        return request;
264    }
265
266    protected SearchRequest buildEsSearchScrollRequest(NxQueryBuilder query, SearchType searchType, long keepAlive) {
267        SearchRequest request = buildEsSearchRequest(query, searchType);
268        request.scroll(new TimeValue(keepAlive));
269        return request;
270    }
271
272    protected SearchScrollRequest buildEsScrollRequest(String scrollId, long keepAlive) {
273        return new SearchScrollRequest(scrollId).scroll(new TimeValue(keepAlive));
274    }
275
276    protected void logSearchResponse(SearchResponse response) {
277        if (log.isDebugEnabled()) {
278            log.debug("Response: " + response.toString());
279        }
280    }
281
282    protected void logSearchRequest(SearchRequest request, NxQueryBuilder query, SearchType searchType) {
283        if (log.isDebugEnabled()) {
284            String scroll = request.scroll() != null ? "&scroll=" + request.scroll() : "";
285            log.debug(String.format(
286                    "Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty&search_type=%s%s' -d '%s'",
287                    getSearchIndexesAsString(query), DOC_TYPE, searchType.toString().toLowerCase(), scroll,
288                    request.source().toString()));
289        }
290    }
291
292    protected void logScrollRequest(String scrollId, long keepAlive) {
293        if (log.isDebugEnabled()) {
294            log.debug(String.format(
295                    "Scroll search: curl -XGET 'http://localhost:9200/_search/scroll?pretty' -d '{\"scroll\" : \"%d\", \"scroll_id\" : \"%s\"}'",
296                    keepAlive, scrollId));
297        }
298    }
299
300    protected String getSearchIndexesAsString(NxQueryBuilder query) {
301        return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ',');
302    }
303
304}