001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     bdelbosc
019 */
020
021package org.nuxeo.elasticsearch.core;
022
023import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
024
025import java.util.List;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.SearchRequestBuilder;
031import org.elasticsearch.action.search.SearchResponse;
032import org.elasticsearch.action.search.SearchType;
033import org.elasticsearch.index.query.QueryBuilder;
034import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
035import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentModelList;
038import org.nuxeo.ecm.core.api.IterableQueryResult;
039import org.nuxeo.ecm.core.api.SortInfo;
040import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
041import org.nuxeo.ecm.platform.query.api.Aggregate;
042import org.nuxeo.ecm.platform.query.api.Bucket;
043import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
044import org.nuxeo.elasticsearch.api.ElasticSearchService;
045import org.nuxeo.elasticsearch.api.EsResult;
046import org.nuxeo.elasticsearch.fetcher.Fetcher;
047import org.nuxeo.elasticsearch.query.NxQueryBuilder;
048import org.nuxeo.runtime.api.Framework;
049import org.nuxeo.runtime.metrics.MetricsService;
050
051import com.codahale.metrics.MetricRegistry;
052import com.codahale.metrics.SharedMetricRegistries;
053import com.codahale.metrics.Timer;
054import com.codahale.metrics.Timer.Context;
055
056/**
057 * @since 6.0
058 */
059public class ElasticSearchServiceImpl implements ElasticSearchService {
060    private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class);
061
062    private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms";
063
064    private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(Framework.getProperty(
065            LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000;
066
067    // Metrics
068    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
069
070    protected final Timer searchTimer;
071
072    protected final Timer fetchTimer;
073
074    private final ElasticSearchAdminImpl esa;
075
076    public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) {
077        this.esa = esa;
078        searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search"));
079        fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch"));
080    }
081
082    @Deprecated
083    @Override
084    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos)
085            {
086        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
087        return query(query);
088    }
089
090    @Deprecated
091    @Override
092    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
093            SortInfo... sortInfos) {
094        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort(
095                sortInfos);
096        return query(query);
097    }
098
099    @Override
100    public DocumentModelList query(NxQueryBuilder queryBuilder) {
101        return queryAndAggregate(queryBuilder).getDocuments();
102    }
103
104    @Override
105    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
106        SearchResponse response = search(queryBuilder);
107        List<Aggregate> aggs = getAggregates(queryBuilder, response);
108        if (queryBuilder.returnsDocuments()) {
109            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
110            return new EsResult(docs, aggs, response);
111        } else if (queryBuilder.returnsRows()) {
112            IterableQueryResult rows = getRows(queryBuilder, response);
113            return new EsResult(rows, aggs, response);
114        }
115        return new EsResult(response);
116    }
117
118    protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) {
119        DocumentModelListImpl ret;
120        long totalSize = response.getHits().getTotalHits();
121        if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) {
122            ret = new DocumentModelListImpl(0);
123            ret.setTotalSize(totalSize);
124            return ret;
125        }
126        Context stopWatch = fetchTimer.time();
127        Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap());
128        try {
129            ret = fetcher.fetchDocuments();
130        } finally {
131            logMinDurationFetch(stopWatch.stop(), totalSize);
132        }
133        ret.setTotalSize(totalSize);
134        return ret;
135    }
136
137    private void logMinDurationFetch(long duration, long totalSize) {
138        if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) {
139            String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0,
140                    totalSize);
141            if (log.isTraceEnabled()) {
142                log.trace(msg, new Throwable("Slow fetch document stack trace"));
143            } else {
144                log.debug(msg);
145            }
146        }
147    }
148
149    protected List<Aggregate> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) {
150        for (AggregateEsBase<? extends Bucket> agg : queryBuilder.getAggregates()) {
151            InternalFilter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg));
152            if (filter == null) {
153                continue;
154            }
155            MultiBucketsAggregation mba = filter.getAggregations().get(agg.getId());
156            if (mba == null) {
157                continue;
158            }
159            agg.parseEsBuckets(mba.getBuckets());
160        }
161        @SuppressWarnings("unchecked")
162        List<Aggregate> ret = (List<Aggregate>) (List<?>) queryBuilder.getAggregates();
163        return ret;
164    }
165
166    private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) {
167        return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes());
168    }
169
170    protected SearchResponse search(NxQueryBuilder query) {
171        Context stopWatch = searchTimer.time();
172        try {
173            SearchRequestBuilder request = buildEsSearchRequest(query);
174            logSearchRequest(request, query);
175            SearchResponse response = request.execute().actionGet();
176            logSearchResponse(response);
177            return response;
178        } finally {
179            stopWatch.stop();
180        }
181    }
182
183    protected SearchRequestBuilder buildEsSearchRequest(NxQueryBuilder query) {
184        SearchRequestBuilder request = esa.getClient().prepareSearch(
185                esa.getSearchIndexes(query.getSearchRepositories())).setTypes(DOC_TYPE).setSearchType(
186                SearchType.DFS_QUERY_THEN_FETCH);
187        query.updateRequest(request);
188        if (query.isFetchFromElasticsearch()) {
189            // fetch the _source without the binaryfulltext field
190            request.setFetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields());
191        }
192        return request;
193    }
194
195    protected void logSearchResponse(SearchResponse response) {
196        if (log.isDebugEnabled()) {
197            log.debug("Response: " + response.toString());
198        }
199    }
200
201    protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query) {
202        if (log.isDebugEnabled()) {
203            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
204                    getSearchIndexesAsString(query), DOC_TYPE, request.toString()));
205        }
206    }
207
208    protected String getSearchIndexesAsString(NxQueryBuilder query) {
209        return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ',');
210    }
211
212}