001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Tiry
016 *     bdelbosc
017 */
018
019package org.nuxeo.elasticsearch.core;
020
021import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
022
023import java.util.List;
024
025import org.apache.commons.lang.StringUtils;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.action.search.SearchRequestBuilder;
029import org.elasticsearch.action.search.SearchResponse;
030import org.elasticsearch.action.search.SearchType;
031import org.elasticsearch.index.query.QueryBuilder;
032import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
033import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
034import org.nuxeo.ecm.core.api.CoreSession;
035import org.nuxeo.ecm.core.api.DocumentModelList;
036import org.nuxeo.ecm.core.api.IterableQueryResult;
037import org.nuxeo.ecm.core.api.SortInfo;
038import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
039import org.nuxeo.ecm.platform.query.api.Aggregate;
040import org.nuxeo.ecm.platform.query.api.Bucket;
041import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
042import org.nuxeo.elasticsearch.api.ElasticSearchService;
043import org.nuxeo.elasticsearch.api.EsResult;
044import org.nuxeo.elasticsearch.fetcher.Fetcher;
045import org.nuxeo.elasticsearch.query.NxQueryBuilder;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.metrics.MetricsService;
048
049import com.codahale.metrics.MetricRegistry;
050import com.codahale.metrics.SharedMetricRegistries;
051import com.codahale.metrics.Timer;
052import com.codahale.metrics.Timer.Context;
053
054/**
055 * @since 6.0
056 */
057public class ElasticSearchServiceImpl implements ElasticSearchService {
058    private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class);
059
060    private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms";
061
062    private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(Framework.getProperty(
063            LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000;
064
065    // Metrics
066    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
067
068    protected final Timer searchTimer;
069
070    protected final Timer fetchTimer;
071
072    private final ElasticSearchAdminImpl esa;
073
074    public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) {
075        this.esa = esa;
076        searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search"));
077        fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch"));
078    }
079
080    @Deprecated
081    @Override
082    public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos)
083            {
084        NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos);
085        return query(query);
086    }
087
088    @Deprecated
089    @Override
090    public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset,
091            SortInfo... sortInfos) {
092        NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort(
093                sortInfos);
094        return query(query);
095    }
096
097    @Override
098    public DocumentModelList query(NxQueryBuilder queryBuilder) {
099        return queryAndAggregate(queryBuilder).getDocuments();
100    }
101
102    @Override
103    public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) {
104        SearchResponse response = search(queryBuilder);
105        List<Aggregate> aggs = getAggregates(queryBuilder, response);
106        if (queryBuilder.returnsDocuments()) {
107            DocumentModelListImpl docs = getDocumentModels(queryBuilder, response);
108            return new EsResult(docs, aggs, response);
109        } else if (queryBuilder.returnsRows()) {
110            IterableQueryResult rows = getRows(queryBuilder, response);
111            return new EsResult(rows, aggs, response);
112        }
113        return new EsResult(response);
114    }
115
116    protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) {
117        DocumentModelListImpl ret;
118        long totalSize = response.getHits().getTotalHits();
119        if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) {
120            ret = new DocumentModelListImpl(0);
121            ret.setTotalSize(totalSize);
122            return ret;
123        }
124        Context stopWatch = fetchTimer.time();
125        Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap());
126        try {
127            ret = fetcher.fetchDocuments();
128        } finally {
129            logMinDurationFetch(stopWatch.stop(), totalSize);
130        }
131        ret.setTotalSize(totalSize);
132        return ret;
133    }
134
135    private void logMinDurationFetch(long duration, long totalSize) {
136        if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) {
137            String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0,
138                    totalSize);
139            if (log.isTraceEnabled()) {
140                log.trace(msg, new Throwable("Slow fetch document stack trace"));
141            } else {
142                log.debug(msg);
143            }
144        }
145    }
146
147    protected List<Aggregate> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) {
148        for (AggregateEsBase<? extends Bucket> agg : queryBuilder.getAggregates()) {
149            InternalFilter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg));
150            if (filter == null) {
151                continue;
152            }
153            MultiBucketsAggregation mba = filter.getAggregations().get(agg.getId());
154            if (mba == null) {
155                continue;
156            }
157            agg.parseEsBuckets(mba.getBuckets());
158        }
159        @SuppressWarnings("unchecked")
160        List<Aggregate> ret = (List<Aggregate>) (List<?>) queryBuilder.getAggregates();
161        return ret;
162    }
163
164    private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) {
165        return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes());
166    }
167
168    protected SearchResponse search(NxQueryBuilder query) {
169        Context stopWatch = searchTimer.time();
170        try {
171            SearchRequestBuilder request = buildEsSearchRequest(query);
172            logSearchRequest(request, query);
173            SearchResponse response = request.execute().actionGet();
174            logSearchResponse(response);
175            return response;
176        } finally {
177            stopWatch.stop();
178        }
179    }
180
181    protected SearchRequestBuilder buildEsSearchRequest(NxQueryBuilder query) {
182        SearchRequestBuilder request = esa.getClient().prepareSearch(
183                esa.getSearchIndexes(query.getSearchRepositories())).setTypes(DOC_TYPE).setSearchType(
184                SearchType.DFS_QUERY_THEN_FETCH);
185        query.updateRequest(request);
186        if (query.isFetchFromElasticsearch()) {
187            // fetch the _source without the binaryfulltext field
188            request.setFetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields());
189        }
190        return request;
191    }
192
193    protected void logSearchResponse(SearchResponse response) {
194        if (log.isDebugEnabled()) {
195            log.debug("Response: " + response.toString());
196        }
197    }
198
199    protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query) {
200        if (log.isDebugEnabled()) {
201            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
202                    getSearchIndexesAsString(query), DOC_TYPE, request.toString()));
203        }
204    }
205
206    protected String getSearchIndexesAsString(NxQueryBuilder query) {
207        return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ',');
208    }
209
210}