001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
024
025import java.util.List;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.ClearScrollRequest;
031import org.elasticsearch.action.search.SearchRequest;
032import org.elasticsearch.action.search.SearchResponse;
033import org.elasticsearch.action.search.SearchScrollRequest;
034import org.elasticsearch.action.search.SearchType;
035import org.elasticsearch.common.unit.TimeValue;
036import org.elasticsearch.index.query.QueryBuilder;
037import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
038import org.elasticsearch.search.aggregations.bucket.filter.Filter;
039import org.elasticsearch.search.builder.SearchSourceBuilder;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModelList;
042import org.nuxeo.ecm.core.api.IterableQueryResult;
043import org.nuxeo.ecm.core.api.SortInfo;
044import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
045import org.nuxeo.ecm.platform.query.api.Aggregate;
046import org.nuxeo.ecm.platform.query.api.Bucket;
047import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
048import org.nuxeo.elasticsearch.api.ElasticSearchService;
049import org.nuxeo.elasticsearch.api.EsResult;
050import org.nuxeo.elasticsearch.api.EsScrollResult;
051import org.nuxeo.elasticsearch.fetcher.Fetcher;
052import org.nuxeo.elasticsearch.query.NxQueryBuilder;
053import org.nuxeo.runtime.api.Framework;
054import org.nuxeo.runtime.metrics.MetricsService;
055
056import com.codahale.metrics.MetricRegistry;
057import com.codahale.metrics.SharedMetricRegistries;
058import com.codahale.metrics.Timer;
059import com.codahale.metrics.Timer.Context;
060
061/**
062 * @since 6.0
063 */
064public class ElasticSearchServiceImpl implements ElasticSearchService {
065    private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class);
066
067    private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms";
068
069    private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(
070            Framework.getProperty(LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000;
071
072    // Metrics
073    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
074
075    protected final Timer searchTimer;
076
077    protected final Timer scrollTimer;
078
079    protected final Timer fetchTimer;
080
081    private final ElasticSearchAdminImpl esa;
082
083    public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) {
084        this.esa = esa;
085        searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search"));
086        scrollTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "scroll"));
087        fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch"));
088    }
089
090    @Deprecated
091    @Override
092    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
093        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
094        return query(query);
095    }
096
097    @Deprecated
098    @Override
099    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
100            SortInfo... sortInfos) {
101        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort(
102                sortInfos);
103        return query(query);
104    }
105
106    @Override
107    public DocumentModelList query(NxQueryBuilder queryBuilder) {
108        return queryAndAggregate(queryBuilder).getDocuments();
109    }
110
111    @Override
112    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
113        SearchResponse response = search(queryBuilder);
114        List<Aggregate<Bucket>> aggs = getAggregates(queryBuilder, response);
115        if (queryBuilder.returnsDocuments()) {
116            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
117            return new EsResult(docs, aggs, response);
118        } else if (queryBuilder.returnsRows()) {
119            IterableQueryResult rows = getRows(queryBuilder, response);
120            return new EsResult(rows, aggs, response);
121        }
122        return new EsResult(response);
123    }
124
125    @Override
126    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
127        return scroll(queryBuilder, SearchType.DFS_QUERY_THEN_FETCH, keepAlive);
128    }
129
130    protected EsScrollResult scroll(NxQueryBuilder queryBuilder, SearchType searchType, long keepAlive) {
131        SearchResponse response = searchScroll(queryBuilder, searchType, keepAlive);
132        return getScrollResults(queryBuilder, response, response.getScrollId(), keepAlive);
133    }
134
135    @Override
136    public EsScrollResult scroll(EsScrollResult scrollResult) {
137        SearchResponse response = nextScroll(scrollResult.getScrollId(), scrollResult.getKeepAlive());
138        return getScrollResults(scrollResult.getQueryBuilder(), response, response.getScrollId(),
139                scrollResult.getKeepAlive());
140    }
141
142    @Override
143    public void clearScroll(EsScrollResult scrollResult) {
144        clearScroll(scrollResult.getScrollId());
145    }
146
147    protected void clearScroll(String scrollId) {
148        if (log.isDebugEnabled()) {
149            log.debug(String.format(
150                    "Clear scroll : curl -XDELETE 'http://localhost:9200/_search/scroll' -d '{\"scroll_id\" : [\"%s\"]}'",
151                    scrollId));
152        }
153        ClearScrollRequest request = new ClearScrollRequest();
154        request.addScrollId(scrollId);
155        esa.getClient().clearScroll(request);
156    }
157
158    protected EsScrollResult getScrollResults(NxQueryBuilder queryBuilder, SearchResponse response, String scrollId,
159            long keepAlive) {
160        if (queryBuilder.returnsDocuments()) {
161            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
162            return new EsScrollResult(docs, response, queryBuilder, scrollId, keepAlive);
163        } else if (queryBuilder.returnsRows()) {
164            IterableQueryResult rows = getRows(queryBuilder, response);
165            return new EsScrollResult(rows, response, queryBuilder, scrollId, keepAlive);
166        }
167        return new EsScrollResult(response, queryBuilder, scrollId, keepAlive);
168    }
169
170    protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) {
171        DocumentModelListImpl ret;
172        long totalSize = response.getHits().getTotalHits();
173        if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) {
174            ret = new DocumentModelListImpl(0);
175            ret.setTotalSize(totalSize);
176            return ret;
177        }
178        try (Context stopWatch = fetchTimer.time()) {
179            Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap());
180            ret = fetcher.fetchDocuments();
181            logMinDurationFetch(stopWatch.stop(), totalSize);
182        }
183        ret.setTotalSize(totalSize);
184        return ret;
185    }
186
187    private void logMinDurationFetch(long duration, long totalSize) {
188        if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) {
189            String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0,
190                    totalSize);
191            if (log.isTraceEnabled()) {
192                log.trace(msg, new Throwable("Slow fetch document stack trace"));
193            } else {
194                log.debug(msg);
195            }
196        }
197    }
198
199    protected List<Aggregate<Bucket>> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) {
200        for (AggregateEsBase<? extends Bucket> agg : queryBuilder.getAggregates()) {
201            Filter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg));
202            if (filter == null) {
203                continue;
204            }
205            MultiBucketsAggregation mba = filter.getAggregations().get(agg.getId());
206            if (mba == null) {
207                continue;
208            }
209            agg.parseEsBuckets(mba.getBuckets());
210        }
211        @SuppressWarnings("unchecked")
212        List<Aggregate<Bucket>> ret = (List<Aggregate<Bucket>>) (List<?>) queryBuilder.getAggregates();
213        return ret;
214    }
215
216    private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) {
217        return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes());
218    }
219
220    protected SearchResponse search(NxQueryBuilder query) {
221        try (Context ignored = searchTimer.time()) {
222            SearchType searchType = SearchType.DFS_QUERY_THEN_FETCH;
223            SearchRequest request = buildEsSearchRequest(query, searchType);
224            logSearchRequest(request, query, searchType);
225            SearchResponse response = esa.getClient().search(request);
226            logSearchResponse(response);
227            return response;
228        }
229    }
230
231    protected SearchResponse searchScroll(NxQueryBuilder query, SearchType searchType, long keepAlive) {
232        try (Context ignored = searchTimer.time()) {
233            SearchRequest request = buildEsSearchScrollRequest(query, searchType, keepAlive);
234            logSearchRequest(request, query, searchType);
235            SearchResponse response = esa.getClient().search(request);
236            logSearchResponse(response);
237            return response;
238        }
239    }
240
241    protected SearchResponse nextScroll(String scrollId, long keepAlive) {
242        try (Context ignored = scrollTimer.time()) {
243            SearchScrollRequest request = buildEsScrollRequest(scrollId, keepAlive);
244            logScrollRequest(scrollId, keepAlive);
245            SearchResponse response = esa.getClient().searchScroll(request);
246            logSearchResponse(response);
247            return response;
248        }
249    }
250
251    protected SearchRequest buildEsSearchRequest(NxQueryBuilder query, SearchType searchType) {
252        SearchRequest request = new SearchRequest(esa.getSearchIndexes(query.getSearchRepositories()));
253        request.searchType(searchType);
254        SearchSourceBuilder search = new SearchSourceBuilder();
255        query.updateRequest(search);
256        request.source(search);
257        if (query.isFetchFromElasticsearch()) {
258            // fetch the _source without the binaryfulltext field
259            search.fetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields());
260        }
261        return request;
262    }
263
264    protected SearchRequest buildEsSearchScrollRequest(NxQueryBuilder query, SearchType searchType, long keepAlive) {
265        SearchRequest request = buildEsSearchRequest(query, searchType);
266        request.scroll(new TimeValue(keepAlive));
267        return request;
268    }
269
270    protected SearchScrollRequest buildEsScrollRequest(String scrollId, long keepAlive) {
271        return new SearchScrollRequest(scrollId).scroll(new TimeValue(keepAlive));
272    }
273
274    protected void logSearchResponse(SearchResponse response) {
275        if (log.isDebugEnabled()) {
276            log.debug("Response: " + response.toString());
277        }
278    }
279
280    protected void logSearchRequest(SearchRequest request, NxQueryBuilder query, SearchType searchType) {
281        if (log.isDebugEnabled()) {
282            String scroll = request.scroll() != null ? "&scroll=" + request.scroll() : "";
283            log.debug(String.format(
284                    "Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty&search_type=%s%s' -d '%s'",
285                    getSearchIndexesAsString(query), DOC_TYPE, searchType.toString().toLowerCase(), scroll,
286                    request.source().toString()));
287        }
288    }
289
290    protected void logScrollRequest(String scrollId, long keepAlive) {
291        if (log.isDebugEnabled()) {
292            log.debug(String.format(
293                    "Scroll search: curl -XGET 'http://localhost:9200/_search/scroll?pretty' -d '{\"scroll\" : \"%d\", \"scroll_id\" : \"%s\"}'",
294                    keepAlive, scrollId));
295        }
296    }
297
298    protected String getSearchIndexesAsString(NxQueryBuilder query) {
299        return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ',');
300    }
301
302}