001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 024 025import java.util.List; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.action.search.SearchRequestBuilder; 031import org.elasticsearch.action.search.SearchResponse; 032import org.elasticsearch.action.search.SearchScrollRequestBuilder; 033import org.elasticsearch.action.search.SearchType; 034import org.elasticsearch.common.unit.TimeValue; 035import org.elasticsearch.index.query.QueryBuilder; 036import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; 037import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; 038import org.nuxeo.ecm.core.api.CoreSession; 039import org.nuxeo.ecm.core.api.DocumentModelList; 040import org.nuxeo.ecm.core.api.IterableQueryResult; 041import org.nuxeo.ecm.core.api.SortInfo; 042import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl; 043import org.nuxeo.ecm.platform.query.api.Aggregate; 044import org.nuxeo.ecm.platform.query.api.Bucket; 045import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 046import org.nuxeo.elasticsearch.api.ElasticSearchService; 047import org.nuxeo.elasticsearch.api.EsResult; 048import org.nuxeo.elasticsearch.api.EsScrollResult; 049import org.nuxeo.elasticsearch.fetcher.Fetcher; 050import org.nuxeo.elasticsearch.query.NxQueryBuilder; 051import org.nuxeo.runtime.api.Framework; 052import org.nuxeo.runtime.metrics.MetricsService; 053 054import com.codahale.metrics.MetricRegistry; 055import com.codahale.metrics.SharedMetricRegistries; 056import com.codahale.metrics.Timer; 057import com.codahale.metrics.Timer.Context; 058 059/** 060 * @since 6.0 061 */ 062public class ElasticSearchServiceImpl implements ElasticSearchService { 063 private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class); 064 065 private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms"; 066 067 private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(Framework.getProperty( 068 LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000; 069 070 // Metrics 071 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 072 073 protected final Timer searchTimer; 074 075 protected final Timer scrollTimer; 076 077 protected final Timer fetchTimer; 078 079 private final ElasticSearchAdminImpl esa; 080 081 public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) { 082 this.esa = esa; 083 searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search")); 084 scrollTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "scroll")); 085 fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch")); 086 } 087 088 @Deprecated 089 @Override 090 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) { 091 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 092 return query(query); 093 } 094 095 @Deprecated 096 @Override 097 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 098 SortInfo... sortInfos) { 099 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder) 100 .limit(limit) 101 .offset(offset) 102 .addSort(sortInfos); 103 return query(query); 104 } 105 106 @Override 107 public DocumentModelList query(NxQueryBuilder queryBuilder) { 108 return queryAndAggregate(queryBuilder).getDocuments(); 109 } 110 111 @Override 112 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 113 SearchResponse response = search(queryBuilder); 114 List<Aggregate> aggs = getAggregates(queryBuilder, response); 115 if (queryBuilder.returnsDocuments()) { 116 DocumentModelListImpl docs = getDocumentModels(queryBuilder, response); 117 return new EsResult(docs, aggs, response); 118 } else if (queryBuilder.returnsRows()) { 119 IterableQueryResult rows = getRows(queryBuilder, response); 120 return new EsResult(rows, aggs, response); 121 } 122 return new EsResult(response); 123 } 124 125 @Override 126 public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) { 127 return scroll(queryBuilder, SearchType.DFS_QUERY_THEN_FETCH, keepAlive); 128 } 129 130 @Override 131 public EsScrollResult scanAndScroll(NxQueryBuilder queryBuilder, long keepAlive) { 132 return scroll(queryBuilder, SearchType.SCAN, keepAlive); 133 } 134 135 protected EsScrollResult scroll(NxQueryBuilder queryBuilder, SearchType searchType, long keepAlive) { 136 SearchResponse response = searchScroll(queryBuilder, searchType, keepAlive); 137 return getScrollResults(queryBuilder, response, response.getScrollId(), keepAlive); 138 } 139 140 @Override 141 public EsScrollResult scroll(EsScrollResult scrollResult) { 142 SearchResponse response = nextScroll(scrollResult.getScrollId(), scrollResult.getKeepAlive()); 143 return getScrollResults(scrollResult.getQueryBuilder(), response, response.getScrollId(), 144 scrollResult.getKeepAlive()); 145 } 146 147 protected EsScrollResult getScrollResults(NxQueryBuilder queryBuilder, SearchResponse response, String scrollId, 148 long keepAlive) { 149 if (queryBuilder.returnsDocuments()) { 150 DocumentModelListImpl docs = getDocumentModels(queryBuilder, response); 151 return new EsScrollResult(docs, response, queryBuilder, scrollId, keepAlive); 152 } else if (queryBuilder.returnsRows()) { 153 IterableQueryResult rows = getRows(queryBuilder, response); 154 return new EsScrollResult(rows, response, queryBuilder, scrollId, keepAlive); 155 } 156 return new EsScrollResult(response, queryBuilder, scrollId, keepAlive); 157 } 158 159 protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) { 160 DocumentModelListImpl ret; 161 long totalSize = response.getHits().getTotalHits(); 162 if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) { 163 ret = new DocumentModelListImpl(0); 164 ret.setTotalSize(totalSize); 165 return ret; 166 } 167 Context stopWatch = fetchTimer.time(); 168 Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap()); 169 try { 170 ret = fetcher.fetchDocuments(); 171 } finally { 172 logMinDurationFetch(stopWatch.stop(), totalSize); 173 } 174 ret.setTotalSize(totalSize); 175 return ret; 176 } 177 178 private void logMinDurationFetch(long duration, long totalSize) { 179 if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) { 180 String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0, 181 totalSize); 182 if (log.isTraceEnabled()) { 183 log.trace(msg, new Throwable("Slow fetch document stack trace")); 184 } else { 185 log.debug(msg); 186 } 187 } 188 } 189 190 protected List<Aggregate> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) { 191 for (AggregateEsBase<? extends Bucket> agg : queryBuilder.getAggregates()) { 192 InternalFilter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg)); 193 if (filter == null) { 194 continue; 195 } 196 MultiBucketsAggregation mba = filter.getAggregations().get(agg.getId()); 197 if (mba == null) { 198 continue; 199 } 200 agg.parseEsBuckets(mba.getBuckets()); 201 } 202 @SuppressWarnings("unchecked") 203 List<Aggregate> ret = (List<Aggregate>) (List<?>) queryBuilder.getAggregates(); 204 return ret; 205 } 206 207 private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) { 208 return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes()); 209 } 210 211 protected SearchResponse search(NxQueryBuilder query) { 212 Context stopWatch = searchTimer.time(); 213 try { 214 SearchType searchType = SearchType.DFS_QUERY_THEN_FETCH; 215 SearchRequestBuilder request = buildEsSearchRequest(query, searchType); 216 logSearchRequest(request, query, searchType); 217 SearchResponse response = request.execute().actionGet(); 218 logSearchResponse(response); 219 return response; 220 } finally { 221 stopWatch.stop(); 222 } 223 } 224 225 protected SearchResponse searchScroll(NxQueryBuilder query, SearchType searchType, long keepAlive) { 226 Context stopWatch = searchTimer.time(); 227 try { 228 SearchRequestBuilder request = buildEsSearchScrollRequest(query, searchType, keepAlive); 229 logSearchRequest(request, query, searchType, keepAlive); 230 SearchResponse response = request.execute().actionGet(); 231 logSearchResponse(response); 232 return response; 233 } finally { 234 stopWatch.stop(); 235 } 236 } 237 238 protected SearchResponse nextScroll(String scrollId, long keepAlive) { 239 Context stopWatch = scrollTimer.time(); 240 try { 241 SearchScrollRequestBuilder request = buildEsScrollRequest(scrollId, keepAlive); 242 logScrollRequest(scrollId, keepAlive); 243 SearchResponse response = request.execute().actionGet(); 244 logSearchResponse(response); 245 return response; 246 } finally { 247 stopWatch.stop(); 248 } 249 } 250 251 protected SearchRequestBuilder buildEsSearchRequest(NxQueryBuilder query, SearchType searchType) { 252 SearchRequestBuilder request = esa.getClient() 253 .prepareSearch(esa.getSearchIndexes(query.getSearchRepositories())) 254 .setTypes(DOC_TYPE) 255 .setSearchType(searchType); 256 query.updateRequest(request); 257 if (query.isFetchFromElasticsearch()) { 258 // fetch the _source without the binaryfulltext field 259 request.setFetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields()); 260 } 261 return request; 262 } 263 264 protected SearchRequestBuilder buildEsSearchScrollRequest(NxQueryBuilder query, SearchType searchType, 265 long keepAlive) { 266 return buildEsSearchRequest(query, searchType).setScroll(new TimeValue(keepAlive)).setSize(query.getLimit()); 267 } 268 269 protected SearchScrollRequestBuilder buildEsScrollRequest(String scrollId, long keepAlive) { 270 return esa.getClient().prepareSearchScroll(scrollId).setScroll(new TimeValue(keepAlive)); 271 } 272 273 protected void logSearchResponse(SearchResponse response) { 274 if (log.isDebugEnabled()) { 275 log.debug("Response: " + response.toString()); 276 } 277 } 278 279 protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query, SearchType searchType) { 280 logSearchRequest(request, query, searchType, null); 281 } 282 283 protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query, SearchType searchType, 284 Long keepAlive) { 285 if (log.isDebugEnabled()) { 286 String scroll = keepAlive != null ? "&scroll=" + keepAlive : ""; 287 log.debug(String.format( 288 "Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty&search_type=%s%s' -d '%s'", 289 getSearchIndexesAsString(query), DOC_TYPE, searchType.toString().toLowerCase(), scroll, 290 request.toString())); 291 } 292 } 293 294 protected void logScrollRequest(String scrollId, long keepAlive) { 295 if (log.isDebugEnabled()) { 296 log.debug(String.format( 297 "Scroll search: curl -XGET 'http://localhost:9200/_search/scroll?pretty' -d '{\"scroll\" : \"%d\", \"scroll_id\" : \"%s\"}'", 298 keepAlive, scrollId)); 299 } 300 } 301 302 protected String getSearchIndexesAsString(NxQueryBuilder query) { 303 return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ','); 304 } 305 306}