001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Tiry 016 * bdelbosc 017 */ 018 019package org.nuxeo.elasticsearch.core; 020 021import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 022 023import java.util.List; 024 025import org.apache.commons.lang.StringUtils; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.action.search.SearchRequestBuilder; 029import org.elasticsearch.action.search.SearchResponse; 030import org.elasticsearch.action.search.SearchType; 031import org.elasticsearch.index.query.QueryBuilder; 032import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; 033import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; 034import org.nuxeo.ecm.core.api.CoreSession; 035import org.nuxeo.ecm.core.api.DocumentModelList; 036import org.nuxeo.ecm.core.api.IterableQueryResult; 037import org.nuxeo.ecm.core.api.SortInfo; 038import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl; 039import org.nuxeo.ecm.platform.query.api.Aggregate; 040import org.nuxeo.ecm.platform.query.api.Bucket; 041import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 042import org.nuxeo.elasticsearch.api.ElasticSearchService; 043import org.nuxeo.elasticsearch.api.EsResult; 044import org.nuxeo.elasticsearch.fetcher.Fetcher; 045import org.nuxeo.elasticsearch.query.NxQueryBuilder; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.metrics.MetricsService; 048 049import com.codahale.metrics.MetricRegistry; 050import com.codahale.metrics.SharedMetricRegistries; 051import com.codahale.metrics.Timer; 052import com.codahale.metrics.Timer.Context; 053 054/** 055 * @since 6.0 056 */ 057public class ElasticSearchServiceImpl implements ElasticSearchService { 058 private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class); 059 060 private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms"; 061 062 private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(Framework.getProperty( 063 LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000; 064 065 // Metrics 066 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 067 068 protected final Timer searchTimer; 069 070 protected final Timer fetchTimer; 071 072 private final ElasticSearchAdminImpl esa; 073 074 public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) { 075 this.esa = esa; 076 searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search")); 077 fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch")); 078 } 079 080 @Deprecated 081 @Override 082 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) 083 { 084 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 085 return query(query); 086 } 087 088 @Deprecated 089 @Override 090 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 091 SortInfo... sortInfos) { 092 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort( 093 sortInfos); 094 return query(query); 095 } 096 097 @Override 098 public DocumentModelList query(NxQueryBuilder queryBuilder) { 099 return queryAndAggregate(queryBuilder).getDocuments(); 100 } 101 102 @Override 103 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 104 SearchResponse response = search(queryBuilder); 105 List<Aggregate> aggs = getAggregates(queryBuilder, response); 106 if (queryBuilder.returnsDocuments()) { 107 DocumentModelListImpl docs = getDocumentModels(queryBuilder, response); 108 return new EsResult(docs, aggs, response); 109 } else if (queryBuilder.returnsRows()) { 110 IterableQueryResult rows = getRows(queryBuilder, response); 111 return new EsResult(rows, aggs, response); 112 } 113 return new EsResult(response); 114 } 115 116 protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) { 117 DocumentModelListImpl ret; 118 long totalSize = response.getHits().getTotalHits(); 119 if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) { 120 ret = new DocumentModelListImpl(0); 121 ret.setTotalSize(totalSize); 122 return ret; 123 } 124 Context stopWatch = fetchTimer.time(); 125 Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap()); 126 try { 127 ret = fetcher.fetchDocuments(); 128 } finally { 129 logMinDurationFetch(stopWatch.stop(), totalSize); 130 } 131 ret.setTotalSize(totalSize); 132 return ret; 133 } 134 135 private void logMinDurationFetch(long duration, long totalSize) { 136 if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) { 137 String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0, 138 totalSize); 139 if (log.isTraceEnabled()) { 140 log.trace(msg, new Throwable("Slow fetch document stack trace")); 141 } else { 142 log.debug(msg); 143 } 144 } 145 } 146 147 protected List<Aggregate> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) { 148 for (AggregateEsBase<? extends Bucket> agg : queryBuilder.getAggregates()) { 149 InternalFilter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg)); 150 if (filter == null) { 151 continue; 152 } 153 MultiBucketsAggregation mba = filter.getAggregations().get(agg.getId()); 154 if (mba == null) { 155 continue; 156 } 157 agg.parseEsBuckets(mba.getBuckets()); 158 } 159 @SuppressWarnings("unchecked") 160 List<Aggregate> ret = (List<Aggregate>) (List<?>) queryBuilder.getAggregates(); 161 return ret; 162 } 163 164 private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) { 165 return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes()); 166 } 167 168 protected SearchResponse search(NxQueryBuilder query) { 169 Context stopWatch = searchTimer.time(); 170 try { 171 SearchRequestBuilder request = buildEsSearchRequest(query); 172 logSearchRequest(request, query); 173 SearchResponse response = request.execute().actionGet(); 174 logSearchResponse(response); 175 return response; 176 } finally { 177 stopWatch.stop(); 178 } 179 } 180 181 protected SearchRequestBuilder buildEsSearchRequest(NxQueryBuilder query) { 182 SearchRequestBuilder request = esa.getClient().prepareSearch( 183 esa.getSearchIndexes(query.getSearchRepositories())).setTypes(DOC_TYPE).setSearchType( 184 SearchType.DFS_QUERY_THEN_FETCH); 185 query.updateRequest(request); 186 if (query.isFetchFromElasticsearch()) { 187 // fetch the _source without the binaryfulltext field 188 request.setFetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields()); 189 } 190 return request; 191 } 192 193 protected void logSearchResponse(SearchResponse response) { 194 if (log.isDebugEnabled()) { 195 log.debug("Response: " + response.toString()); 196 } 197 } 198 199 protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query) { 200 if (log.isDebugEnabled()) { 201 log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 202 getSearchIndexesAsString(query), DOC_TYPE, request.toString())); 203 } 204 } 205 206 protected String getSearchIndexesAsString(NxQueryBuilder query) { 207 return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ','); 208 } 209 210}