001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 024 025import java.util.List; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.action.search.ClearScrollRequest; 031import org.elasticsearch.action.search.SearchRequest; 032import org.elasticsearch.action.search.SearchResponse; 033import org.elasticsearch.action.search.SearchScrollRequest; 034import org.elasticsearch.action.search.SearchType; 035import org.elasticsearch.common.unit.TimeValue; 036import org.elasticsearch.index.query.QueryBuilder; 037import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; 038import org.elasticsearch.search.aggregations.bucket.filter.Filter; 039import org.elasticsearch.search.builder.SearchSourceBuilder; 040import org.nuxeo.ecm.core.api.CoreSession; 041import org.nuxeo.ecm.core.api.DocumentModelList; 042import org.nuxeo.ecm.core.api.IterableQueryResult; 043import org.nuxeo.ecm.core.api.SortInfo; 044import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl; 045import org.nuxeo.ecm.platform.query.api.Aggregate; 046import org.nuxeo.ecm.platform.query.api.Bucket; 047import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 048import org.nuxeo.elasticsearch.api.ElasticSearchService; 049import org.nuxeo.elasticsearch.api.EsResult; 050import org.nuxeo.elasticsearch.api.EsScrollResult; 051import org.nuxeo.elasticsearch.fetcher.Fetcher; 052import org.nuxeo.elasticsearch.query.NxQueryBuilder; 053import org.nuxeo.runtime.api.Framework; 054import org.nuxeo.runtime.metrics.MetricsService; 055 056import com.codahale.metrics.MetricRegistry; 057import com.codahale.metrics.SharedMetricRegistries; 058import com.codahale.metrics.Timer; 059import com.codahale.metrics.Timer.Context; 060 061/** 062 * @since 6.0 063 */ 064public class ElasticSearchServiceImpl implements ElasticSearchService { 065 private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class); 066 067 private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms"; 068 069 private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong( 070 Framework.getProperty(LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000; 071 072 // Metrics 073 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 074 075 protected final Timer searchTimer; 076 077 protected final Timer scrollTimer; 078 079 protected final Timer fetchTimer; 080 081 private final ElasticSearchAdminImpl esa; 082 083 public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) { 084 this.esa = esa; 085 searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search")); 086 scrollTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "scroll")); 087 fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch")); 088 } 089 090 @Deprecated 091 @Override 092 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) { 093 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 094 return query(query); 095 } 096 097 @Deprecated 098 @Override 099 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 100 SortInfo... sortInfos) { 101 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort( 102 sortInfos); 103 return query(query); 104 } 105 106 @Override 107 public DocumentModelList query(NxQueryBuilder queryBuilder) { 108 return queryAndAggregate(queryBuilder).getDocuments(); 109 } 110 111 @Override 112 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 113 SearchResponse response = search(queryBuilder); 114 List<Aggregate<Bucket>> aggs = getAggregates(queryBuilder, response); 115 if (queryBuilder.returnsDocuments()) { 116 DocumentModelListImpl docs = getDocumentModels(queryBuilder, response); 117 return new EsResult(docs, aggs, response); 118 } else if (queryBuilder.returnsRows()) { 119 IterableQueryResult rows = getRows(queryBuilder, response); 120 return new EsResult(rows, aggs, response); 121 } 122 return new EsResult(response); 123 } 124 125 @Override 126 public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) { 127 return scroll(queryBuilder, SearchType.DFS_QUERY_THEN_FETCH, keepAlive); 128 } 129 130 protected EsScrollResult scroll(NxQueryBuilder queryBuilder, SearchType searchType, long keepAlive) { 131 SearchResponse response = searchScroll(queryBuilder, searchType, keepAlive); 132 return getScrollResults(queryBuilder, response, response.getScrollId(), keepAlive); 133 } 134 135 @Override 136 public EsScrollResult scroll(EsScrollResult scrollResult) { 137 SearchResponse response = nextScroll(scrollResult.getScrollId(), scrollResult.getKeepAlive()); 138 return getScrollResults(scrollResult.getQueryBuilder(), response, response.getScrollId(), 139 scrollResult.getKeepAlive()); 140 } 141 142 @Override 143 public void clearScroll(EsScrollResult scrollResult) { 144 clearScroll(scrollResult.getScrollId()); 145 } 146 147 protected void clearScroll(String scrollId) { 148 if (log.isDebugEnabled()) { 149 log.debug(String.format( 150 "Clear scroll : curl -XDELETE 'http://localhost:9200/_search/scroll' -d '{\"scroll_id\" : [\"%s\"]}'", 151 scrollId)); 152 } 153 ClearScrollRequest request = new ClearScrollRequest(); 154 request.addScrollId(scrollId); 155 esa.getClient().clearScroll(request); 156 } 157 158 protected EsScrollResult getScrollResults(NxQueryBuilder queryBuilder, SearchResponse response, String scrollId, 159 long keepAlive) { 160 if (queryBuilder.returnsDocuments()) { 161 DocumentModelListImpl docs = getDocumentModels(queryBuilder, response); 162 return new EsScrollResult(docs, response, queryBuilder, scrollId, keepAlive); 163 } else if (queryBuilder.returnsRows()) { 164 IterableQueryResult rows = getRows(queryBuilder, response); 165 return new EsScrollResult(rows, response, queryBuilder, scrollId, keepAlive); 166 } 167 return new EsScrollResult(response, queryBuilder, scrollId, keepAlive); 168 } 169 170 protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) { 171 DocumentModelListImpl ret; 172 long totalSize = response.getHits().getTotalHits(); 173 if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) { 174 ret = new DocumentModelListImpl(0); 175 ret.setTotalSize(totalSize); 176 return ret; 177 } 178 try (Context stopWatch = fetchTimer.time()) { 179 Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap()); 180 ret = fetcher.fetchDocuments(); 181 logMinDurationFetch(stopWatch.stop(), totalSize); 182 } 183 ret.setTotalSize(totalSize); 184 return ret; 185 } 186 187 private void logMinDurationFetch(long duration, long totalSize) { 188 if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) { 189 String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0, 190 totalSize); 191 if (log.isTraceEnabled()) { 192 log.trace(msg, new Throwable("Slow fetch document stack trace")); 193 } else { 194 log.debug(msg); 195 } 196 } 197 } 198 199 protected List<Aggregate<Bucket>> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) { 200 for (AggregateEsBase<? extends Bucket> agg : queryBuilder.getAggregates()) { 201 Filter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg)); 202 if (filter == null) { 203 continue; 204 } 205 MultiBucketsAggregation mba = filter.getAggregations().get(agg.getId()); 206 if (mba == null) { 207 continue; 208 } 209 agg.parseEsBuckets(mba.getBuckets()); 210 } 211 @SuppressWarnings("unchecked") 212 List<Aggregate<Bucket>> ret = (List<Aggregate<Bucket>>) (List<?>) queryBuilder.getAggregates(); 213 return ret; 214 } 215 216 private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) { 217 return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes()); 218 } 219 220 protected SearchResponse search(NxQueryBuilder query) { 221 try (Context ignored = searchTimer.time()) { 222 SearchType searchType = SearchType.DFS_QUERY_THEN_FETCH; 223 SearchRequest request = buildEsSearchRequest(query, searchType); 224 logSearchRequest(request, query, searchType); 225 SearchResponse response = esa.getClient().search(request); 226 logSearchResponse(response); 227 return response; 228 } 229 } 230 231 protected SearchResponse searchScroll(NxQueryBuilder query, SearchType searchType, long keepAlive) { 232 try (Context ignored = searchTimer.time()) { 233 SearchRequest request = buildEsSearchScrollRequest(query, searchType, keepAlive); 234 logSearchRequest(request, query, searchType); 235 SearchResponse response = esa.getClient().search(request); 236 logSearchResponse(response); 237 return response; 238 } 239 } 240 241 protected SearchResponse nextScroll(String scrollId, long keepAlive) { 242 try (Context ignored = scrollTimer.time()) { 243 SearchScrollRequest request = buildEsScrollRequest(scrollId, keepAlive); 244 logScrollRequest(scrollId, keepAlive); 245 SearchResponse response = esa.getClient().searchScroll(request); 246 logSearchResponse(response); 247 return response; 248 } 249 } 250 251 protected SearchRequest buildEsSearchRequest(NxQueryBuilder query, SearchType searchType) { 252 SearchRequest request = new SearchRequest(esa.getSearchIndexes(query.getSearchRepositories())); 253 request.searchType(searchType); 254 SearchSourceBuilder search = new SearchSourceBuilder(); 255 query.updateRequest(search); 256 request.source(search); 257 if (query.isFetchFromElasticsearch()) { 258 // fetch the _source without the binaryfulltext field 259 search.fetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields()); 260 } 261 return request; 262 } 263 264 protected SearchRequest buildEsSearchScrollRequest(NxQueryBuilder query, SearchType searchType, long keepAlive) { 265 SearchRequest request = buildEsSearchRequest(query, searchType); 266 request.scroll(new TimeValue(keepAlive)); 267 return request; 268 } 269 270 protected SearchScrollRequest buildEsScrollRequest(String scrollId, long keepAlive) { 271 return new SearchScrollRequest(scrollId).scroll(new TimeValue(keepAlive)); 272 } 273 274 protected void logSearchResponse(SearchResponse response) { 275 if (log.isDebugEnabled()) { 276 log.debug("Response: " + response.toString()); 277 } 278 } 279 280 protected void logSearchRequest(SearchRequest request, NxQueryBuilder query, SearchType searchType) { 281 if (log.isDebugEnabled()) { 282 String scroll = request.scroll() != null ? "&scroll=" + request.scroll() : ""; 283 log.debug(String.format( 284 "Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty&search_type=%s%s' -d '%s'", 285 getSearchIndexesAsString(query), DOC_TYPE, searchType.toString().toLowerCase(), scroll, 286 request.source().toString())); 287 } 288 } 289 290 protected void logScrollRequest(String scrollId, long keepAlive) { 291 if (log.isDebugEnabled()) { 292 log.debug(String.format( 293 "Scroll search: curl -XGET 'http://localhost:9200/_search/scroll?pretty' -d '{\"scroll\" : \"%d\", \"scroll_id\" : \"%s\"}'", 294 keepAlive, scrollId)); 295 } 296 } 297 298 protected String getSearchIndexesAsString(NxQueryBuilder query) { 299 return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ','); 300 } 301 302}