001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
024
025import java.util.List;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.SearchRequestBuilder;
031import org.elasticsearch.action.search.SearchResponse;
032import org.elasticsearch.action.search.SearchScrollRequestBuilder;
033import org.elasticsearch.action.search.SearchType;
034import org.elasticsearch.common.unit.TimeValue;
035import org.elasticsearch.index.query.QueryBuilder;
036import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
037import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
038import org.nuxeo.ecm.core.api.CoreSession;
039import org.nuxeo.ecm.core.api.DocumentModelList;
040import org.nuxeo.ecm.core.api.IterableQueryResult;
041import org.nuxeo.ecm.core.api.SortInfo;
042import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
043import org.nuxeo.ecm.platform.query.api.Aggregate;
044import org.nuxeo.ecm.platform.query.api.Bucket;
045import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
046import org.nuxeo.elasticsearch.api.ElasticSearchService;
047import org.nuxeo.elasticsearch.api.EsResult;
048import org.nuxeo.elasticsearch.api.EsScrollResult;
049import org.nuxeo.elasticsearch.fetcher.Fetcher;
050import org.nuxeo.elasticsearch.query.NxQueryBuilder;
051import org.nuxeo.runtime.api.Framework;
052import org.nuxeo.runtime.metrics.MetricsService;
053
054import com.codahale.metrics.MetricRegistry;
055import com.codahale.metrics.SharedMetricRegistries;
056import com.codahale.metrics.Timer;
057import com.codahale.metrics.Timer.Context;
058
059/**
060 * @since 6.0
061 */
062public class ElasticSearchServiceImpl implements ElasticSearchService {
063    private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class);
064
065    private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms";
066
067    private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(
068            Framework.getProperty(LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000;
069
070    // Metrics
071    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
072
073    protected final Timer searchTimer;
074
075    protected final Timer scrollTimer;
076
077    protected final Timer fetchTimer;
078
079    private final ElasticSearchAdminImpl esa;
080
081    public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) {
082        this.esa = esa;
083        searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search"));
084        scrollTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "scroll"));
085        fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch"));
086    }
087
088    @Deprecated
089    @Override
090    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
091        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
092        return query(query);
093    }
094
095    @Deprecated
096    @Override
097    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
098            SortInfo... sortInfos) {
099        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder)
100                                                          .limit(limit)
101                                                          .offset(offset)
102                                                          .addSort(sortInfos);
103        return query(query);
104    }
105
106    @Override
107    public DocumentModelList query(NxQueryBuilder queryBuilder) {
108        return queryAndAggregate(queryBuilder).getDocuments();
109    }
110
111    @Override
112    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
113        SearchResponse response = search(queryBuilder);
114        List<Aggregate<Bucket>> aggs = getAggregates(queryBuilder, response);
115        if (queryBuilder.returnsDocuments()) {
116            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
117            return new EsResult(docs, aggs, response);
118        } else if (queryBuilder.returnsRows()) {
119            IterableQueryResult rows = getRows(queryBuilder, response);
120            return new EsResult(rows, aggs, response);
121        }
122        return new EsResult(response);
123    }
124
125    @Override
126    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
127        return scroll(queryBuilder, SearchType.DFS_QUERY_THEN_FETCH, keepAlive);
128    }
129
130    protected EsScrollResult scroll(NxQueryBuilder queryBuilder, SearchType searchType, long keepAlive) {
131        SearchResponse response = searchScroll(queryBuilder, searchType, keepAlive);
132        return getScrollResults(queryBuilder, response, response.getScrollId(), keepAlive);
133    }
134
135    @Override
136    public EsScrollResult scroll(EsScrollResult scrollResult) {
137        SearchResponse response = nextScroll(scrollResult.getScrollId(), scrollResult.getKeepAlive());
138        return getScrollResults(scrollResult.getQueryBuilder(), response, response.getScrollId(),
139                scrollResult.getKeepAlive());
140    }
141
142    @Override
143    public void clearScroll(EsScrollResult scrollResult) {
144        clearScroll(scrollResult.getScrollId());
145    }
146
147    protected void clearScroll(String scrollId) {
148        if (log.isDebugEnabled()) {
149            log.debug(String.format(
150                    "Clear scroll : curl -XDELETE 'http://localhost:9200/_search/scroll' -d '{\"scroll_id\" : [\"%s\"]}'",
151                    scrollId));
152        }
153        esa.getClient().prepareClearScroll().addScrollId(scrollId).execute().actionGet();
154    }
155
156    protected EsScrollResult getScrollResults(NxQueryBuilder queryBuilder, SearchResponse response, String scrollId,
157            long keepAlive) {
158        if (queryBuilder.returnsDocuments()) {
159            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
160            return new EsScrollResult(docs, response, queryBuilder, scrollId, keepAlive);
161        } else if (queryBuilder.returnsRows()) {
162            IterableQueryResult rows = getRows(queryBuilder, response);
163            return new EsScrollResult(rows, response, queryBuilder, scrollId, keepAlive);
164        }
165        return new EsScrollResult(response, queryBuilder, scrollId, keepAlive);
166    }
167
168    protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) {
169        DocumentModelListImpl ret;
170        long totalSize = response.getHits().getTotalHits();
171        if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) {
172            ret = new DocumentModelListImpl(0);
173            ret.setTotalSize(totalSize);
174            return ret;
175        }
176        Context stopWatch = fetchTimer.time();
177        Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap());
178        try {
179            ret = fetcher.fetchDocuments();
180        } finally {
181            logMinDurationFetch(stopWatch.stop(), totalSize);
182        }
183        ret.setTotalSize(totalSize);
184        return ret;
185    }
186
187    private void logMinDurationFetch(long duration, long totalSize) {
188        if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) {
189            String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0,
190                    totalSize);
191            if (log.isTraceEnabled()) {
192                log.trace(msg, new Throwable("Slow fetch document stack trace"));
193            } else {
194                log.debug(msg);
195            }
196        }
197    }
198
199    protected List<Aggregate<Bucket>> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) {
200        for (AggregateEsBase<? extends Bucket> agg : queryBuilder.getAggregates()) {
201            InternalFilter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg));
202            if (filter == null) {
203                continue;
204            }
205            MultiBucketsAggregation mba = filter.getAggregations().get(agg.getId());
206            if (mba == null) {
207                continue;
208            }
209            agg.parseEsBuckets(mba.getBuckets());
210        }
211        @SuppressWarnings("unchecked")
212        List<Aggregate<Bucket>> ret = (List<Aggregate<Bucket>>) (List<?>) queryBuilder.getAggregates();
213        return ret;
214    }
215
216    private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) {
217        return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes());
218    }
219
220    protected SearchResponse search(NxQueryBuilder query) {
221        Context stopWatch = searchTimer.time();
222        try {
223            SearchType searchType = SearchType.DFS_QUERY_THEN_FETCH;
224            SearchRequestBuilder request = buildEsSearchRequest(query, searchType);
225            logSearchRequest(request, query, searchType);
226            SearchResponse response = request.execute().actionGet();
227            logSearchResponse(response);
228            return response;
229        } finally {
230            stopWatch.stop();
231        }
232    }
233
234    protected SearchResponse searchScroll(NxQueryBuilder query, SearchType searchType, long keepAlive) {
235        Context stopWatch = searchTimer.time();
236        try {
237            SearchRequestBuilder request = buildEsSearchScrollRequest(query, searchType, keepAlive);
238            logSearchRequest(request, query, searchType, keepAlive);
239            SearchResponse response = request.execute().actionGet();
240            logSearchResponse(response);
241            return response;
242        } finally {
243            stopWatch.stop();
244        }
245    }
246
247    protected SearchResponse nextScroll(String scrollId, long keepAlive) {
248        Context stopWatch = scrollTimer.time();
249        try {
250            SearchScrollRequestBuilder request = buildEsScrollRequest(scrollId, keepAlive);
251            logScrollRequest(scrollId, keepAlive);
252            SearchResponse response = request.execute().actionGet();
253            logSearchResponse(response);
254            return response;
255        } finally {
256            stopWatch.stop();
257        }
258    }
259
260    protected SearchRequestBuilder buildEsSearchRequest(NxQueryBuilder query, SearchType searchType) {
261        SearchRequestBuilder request = esa.getClient()
262                                          .prepareSearch(esa.getSearchIndexes(query.getSearchRepositories()))
263                                          .setTypes(DOC_TYPE)
264                                          .setSearchType(searchType);
265        query.updateRequest(request);
266        if (query.isFetchFromElasticsearch()) {
267            // fetch the _source without the binaryfulltext field
268            request.setFetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields());
269        }
270        return request;
271    }
272
273    protected SearchRequestBuilder buildEsSearchScrollRequest(NxQueryBuilder query, SearchType searchType,
274            long keepAlive) {
275        return buildEsSearchRequest(query, searchType).setScroll(new TimeValue(keepAlive)).setSize(query.getLimit());
276    }
277
278    protected SearchScrollRequestBuilder buildEsScrollRequest(String scrollId, long keepAlive) {
279        return esa.getClient().prepareSearchScroll(scrollId).setScroll(new TimeValue(keepAlive));
280    }
281
282    protected void logSearchResponse(SearchResponse response) {
283        if (log.isDebugEnabled()) {
284            log.debug("Response: " + response.toString());
285        }
286    }
287
288    protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query, SearchType searchType) {
289        logSearchRequest(request, query, searchType, null);
290    }
291
292    protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query, SearchType searchType,
293            Long keepAlive) {
294        if (log.isDebugEnabled()) {
295            String scroll = keepAlive != null ? "&scroll=" + keepAlive : "";
296            log.debug(String.format(
297                    "Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty&search_type=%s%s' -d '%s'",
298                    getSearchIndexesAsString(query), DOC_TYPE, searchType.toString().toLowerCase(), scroll,
299                    request.toString()));
300        }
301    }
302
303    protected void logScrollRequest(String scrollId, long keepAlive) {
304        if (log.isDebugEnabled()) {
305            log.debug(String.format(
306                    "Scroll search: curl -XGET 'http://localhost:9200/_search/scroll?pretty' -d '{\"scroll\" : \"%d\", \"scroll_id\" : \"%s\"}'",
307                    keepAlive, scrollId));
308        }
309    }
310
311    protected String getSearchIndexesAsString(NxQueryBuilder query) {
312        return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ',');
313    }
314
315}