001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
024
025import java.util.List;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.SearchRequestBuilder;
031import org.elasticsearch.action.search.SearchResponse;
032import org.elasticsearch.action.search.SearchScrollRequestBuilder;
033import org.elasticsearch.action.search.SearchType;
034import org.elasticsearch.common.unit.TimeValue;
035import org.elasticsearch.index.query.QueryBuilder;
036import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
037import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
038import org.nuxeo.ecm.core.api.CoreSession;
039import org.nuxeo.ecm.core.api.DocumentModelList;
040import org.nuxeo.ecm.core.api.IterableQueryResult;
041import org.nuxeo.ecm.core.api.SortInfo;
042import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
043import org.nuxeo.ecm.platform.query.api.Aggregate;
044import org.nuxeo.ecm.platform.query.api.Bucket;
045import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
046import org.nuxeo.elasticsearch.api.ElasticSearchService;
047import org.nuxeo.elasticsearch.api.EsResult;
048import org.nuxeo.elasticsearch.api.EsScrollResult;
049import org.nuxeo.elasticsearch.fetcher.Fetcher;
050import org.nuxeo.elasticsearch.query.NxQueryBuilder;
051import org.nuxeo.runtime.api.Framework;
052import org.nuxeo.runtime.metrics.MetricsService;
053
054import com.codahale.metrics.MetricRegistry;
055import com.codahale.metrics.SharedMetricRegistries;
056import com.codahale.metrics.Timer;
057import com.codahale.metrics.Timer.Context;
058
059/**
060 * @since 6.0
061 */
062public class ElasticSearchServiceImpl implements ElasticSearchService {
063    private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class);
064
065    private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms";
066
067    private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(Framework.getProperty(
068            LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000;
069
070    // Metrics
071    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
072
073    protected final Timer searchTimer;
074
075    protected final Timer scrollTimer;
076
077    protected final Timer fetchTimer;
078
079    private final ElasticSearchAdminImpl esa;
080
081    public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) {
082        this.esa = esa;
083        searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search"));
084        scrollTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "scroll"));
085        fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch"));
086    }
087
088    @Deprecated
089    @Override
090    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) {
091        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
092        return query(query);
093    }
094
095    @Deprecated
096    @Override
097    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
098            SortInfo... sortInfos) {
099        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder)
100                                                          .limit(limit)
101                                                          .offset(offset)
102                                                          .addSort(sortInfos);
103        return query(query);
104    }
105
106    @Override
107    public DocumentModelList query(NxQueryBuilder queryBuilder) {
108        return queryAndAggregate(queryBuilder).getDocuments();
109    }
110
111    @Override
112    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
113        SearchResponse response = search(queryBuilder);
114        List<Aggregate> aggs = getAggregates(queryBuilder, response);
115        if (queryBuilder.returnsDocuments()) {
116            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
117            return new EsResult(docs, aggs, response);
118        } else if (queryBuilder.returnsRows()) {
119            IterableQueryResult rows = getRows(queryBuilder, response);
120            return new EsResult(rows, aggs, response);
121        }
122        return new EsResult(response);
123    }
124
125    @Override
126    public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) {
127        return scroll(queryBuilder, SearchType.DFS_QUERY_THEN_FETCH, keepAlive);
128    }
129
130    @Override
131    public EsScrollResult scanAndScroll(NxQueryBuilder queryBuilder, long keepAlive) {
132        return scroll(queryBuilder, SearchType.SCAN, keepAlive);
133    }
134
135    protected EsScrollResult scroll(NxQueryBuilder queryBuilder, SearchType searchType, long keepAlive) {
136        SearchResponse response = searchScroll(queryBuilder, searchType, keepAlive);
137        return getScrollResults(queryBuilder, response, response.getScrollId(), keepAlive);
138    }
139
140    @Override
141    public EsScrollResult scroll(EsScrollResult scrollResult) {
142        SearchResponse response = nextScroll(scrollResult.getScrollId(), scrollResult.getKeepAlive());
143        return getScrollResults(scrollResult.getQueryBuilder(), response, response.getScrollId(),
144                scrollResult.getKeepAlive());
145    }
146
147    protected EsScrollResult getScrollResults(NxQueryBuilder queryBuilder, SearchResponse response, String scrollId,
148            long keepAlive) {
149        if (queryBuilder.returnsDocuments()) {
150            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
151            return new EsScrollResult(docs, response, queryBuilder, scrollId, keepAlive);
152        } else if (queryBuilder.returnsRows()) {
153            IterableQueryResult rows = getRows(queryBuilder, response);
154            return new EsScrollResult(rows, response, queryBuilder, scrollId, keepAlive);
155        }
156        return new EsScrollResult(response, queryBuilder, scrollId, keepAlive);
157    }
158
159    protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) {
160        DocumentModelListImpl ret;
161        long totalSize = response.getHits().getTotalHits();
162        if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) {
163            ret = new DocumentModelListImpl(0);
164            ret.setTotalSize(totalSize);
165            return ret;
166        }
167        Context stopWatch = fetchTimer.time();
168        Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap());
169        try {
170            ret = fetcher.fetchDocuments();
171        } finally {
172            logMinDurationFetch(stopWatch.stop(), totalSize);
173        }
174        ret.setTotalSize(totalSize);
175        return ret;
176    }
177
178    private void logMinDurationFetch(long duration, long totalSize) {
179        if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) {
180            String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0,
181                    totalSize);
182            if (log.isTraceEnabled()) {
183                log.trace(msg, new Throwable("Slow fetch document stack trace"));
184            } else {
185                log.debug(msg);
186            }
187        }
188    }
189
190    protected List<Aggregate> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) {
191        for (AggregateEsBase<? extends Bucket> agg : queryBuilder.getAggregates()) {
192            InternalFilter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg));
193            if (filter == null) {
194                continue;
195            }
196            MultiBucketsAggregation mba = filter.getAggregations().get(agg.getId());
197            if (mba == null) {
198                continue;
199            }
200            agg.parseEsBuckets(mba.getBuckets());
201        }
202        @SuppressWarnings("unchecked")
203        List<Aggregate> ret = (List<Aggregate>) (List<?>) queryBuilder.getAggregates();
204        return ret;
205    }
206
207    private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) {
208        return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes());
209    }
210
211    protected SearchResponse search(NxQueryBuilder query) {
212        Context stopWatch = searchTimer.time();
213        try {
214            SearchType searchType = SearchType.DFS_QUERY_THEN_FETCH;
215            SearchRequestBuilder request = buildEsSearchRequest(query, searchType);
216            logSearchRequest(request, query, searchType);
217            SearchResponse response = request.execute().actionGet();
218            logSearchResponse(response);
219            return response;
220        } finally {
221            stopWatch.stop();
222        }
223    }
224
225    protected SearchResponse searchScroll(NxQueryBuilder query, SearchType searchType, long keepAlive) {
226        Context stopWatch = searchTimer.time();
227        try {
228            SearchRequestBuilder request = buildEsSearchScrollRequest(query, searchType, keepAlive);
229            logSearchRequest(request, query, searchType, keepAlive);
230            SearchResponse response = request.execute().actionGet();
231            logSearchResponse(response);
232            return response;
233        } finally {
234            stopWatch.stop();
235        }
236    }
237
238    protected SearchResponse nextScroll(String scrollId, long keepAlive) {
239        Context stopWatch = scrollTimer.time();
240        try {
241            SearchScrollRequestBuilder request = buildEsScrollRequest(scrollId, keepAlive);
242            logScrollRequest(scrollId, keepAlive);
243            SearchResponse response = request.execute().actionGet();
244            logSearchResponse(response);
245            return response;
246        } finally {
247            stopWatch.stop();
248        }
249    }
250
251    protected SearchRequestBuilder buildEsSearchRequest(NxQueryBuilder query, SearchType searchType) {
252        SearchRequestBuilder request = esa.getClient()
253                                          .prepareSearch(esa.getSearchIndexes(query.getSearchRepositories()))
254                                          .setTypes(DOC_TYPE)
255                                          .setSearchType(searchType);
256        query.updateRequest(request);
257        if (query.isFetchFromElasticsearch()) {
258            // fetch the _source without the binaryfulltext field
259            request.setFetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields());
260        }
261        return request;
262    }
263
264    protected SearchRequestBuilder buildEsSearchScrollRequest(NxQueryBuilder query, SearchType searchType,
265            long keepAlive) {
266        return buildEsSearchRequest(query, searchType).setScroll(new TimeValue(keepAlive)).setSize(query.getLimit());
267    }
268
269    protected SearchScrollRequestBuilder buildEsScrollRequest(String scrollId, long keepAlive) {
270        return esa.getClient().prepareSearchScroll(scrollId).setScroll(new TimeValue(keepAlive));
271    }
272
273    protected void logSearchResponse(SearchResponse response) {
274        if (log.isDebugEnabled()) {
275            log.debug("Response: " + response.toString());
276        }
277    }
278
279    protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query, SearchType searchType) {
280        logSearchRequest(request, query, searchType, null);
281    }
282
283    protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query, SearchType searchType,
284            Long keepAlive) {
285        if (log.isDebugEnabled()) {
286            String scroll = keepAlive != null ? "&scroll=" + keepAlive : "";
287            log.debug(String.format(
288                    "Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty&search_type=%s%s' -d '%s'",
289                    getSearchIndexesAsString(query), DOC_TYPE, searchType.toString().toLowerCase(), scroll,
290                    request.toString()));
291        }
292    }
293
294    protected void logScrollRequest(String scrollId, long keepAlive) {
295        if (log.isDebugEnabled()) {
296            log.debug(String.format(
297                    "Scroll search: curl -XGET 'http://localhost:9200/_search/scroll?pretty' -d '{\"scroll\" : \"%d\", \"scroll_id\" : \"%s\"}'",
298                    keepAlive, scrollId));
299        }
300    }
301
302    protected String getSearchIndexesAsString(NxQueryBuilder query) {
303        return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ',');
304    }
305
306}