001/* 002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import java.util.List; 024 025import org.apache.commons.lang3.StringUtils; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.action.search.ClearScrollRequest; 029import org.elasticsearch.action.search.SearchRequest; 030import org.elasticsearch.action.search.SearchResponse; 031import org.elasticsearch.action.search.SearchScrollRequest; 032import org.elasticsearch.action.search.SearchType; 033import org.elasticsearch.common.unit.TimeValue; 034import org.elasticsearch.index.query.QueryBuilder; 035import org.elasticsearch.search.aggregations.Aggregation; 036import org.elasticsearch.search.aggregations.bucket.filter.Filter; 037import org.elasticsearch.search.builder.SearchSourceBuilder; 038import org.nuxeo.ecm.core.api.CoreSession; 039import org.nuxeo.ecm.core.api.DocumentModelList; 040import org.nuxeo.ecm.core.api.IterableQueryResult; 041import org.nuxeo.ecm.core.api.SortInfo; 042import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl; 043import org.nuxeo.ecm.platform.query.api.Aggregate; 044import org.nuxeo.ecm.platform.query.api.Bucket; 045import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 046import org.nuxeo.elasticsearch.api.ElasticSearchService; 047import org.nuxeo.elasticsearch.api.EsResult; 048import org.nuxeo.elasticsearch.api.EsScrollResult; 049import org.nuxeo.elasticsearch.fetcher.Fetcher; 050import org.nuxeo.elasticsearch.query.NxQueryBuilder; 051import org.nuxeo.runtime.api.Framework; 052import org.nuxeo.runtime.metrics.MetricsService; 053 054import io.dropwizard.metrics5.MetricName; 055import io.dropwizard.metrics5.MetricRegistry; 056import io.dropwizard.metrics5.SharedMetricRegistries; 057import io.dropwizard.metrics5.Timer; 058import io.dropwizard.metrics5.Timer.Context; 059 060/** 061 * @since 6.0 062 */ 063public class ElasticSearchServiceImpl implements ElasticSearchService { 064 private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class); 065 066 private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms"; 067 068 private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong( 069 Framework.getProperty(LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000; 070 071 // Metrics 072 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 073 074 protected final Timer searchTimer; 075 076 protected final Timer scrollTimer; 077 078 protected final Timer fetchTimer; 079 080 private final ElasticSearchAdminImpl esa; 081 082 public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) { 083 this.esa = esa; 084 searchTimer = registry.timer(MetricName.build("nuxeo.elasticsearch.service.timer").tagged("service", "search")); 085 scrollTimer = registry.timer(MetricName.build("nuxeo.elasticsearch.service.timer").tagged("service", "scroll")); 086 fetchTimer = registry.timer(MetricName.build("nuxeo.elasticsearch.service.timer").tagged("service", "fetch")); 087 } 088 089 @Deprecated 090 @Override 091 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) { 092 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 093 return query(query); 094 } 095 096 @Deprecated 097 @Override 098 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 099 SortInfo... sortInfos) { 100 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder) 101 .limit(limit) 102 .offset(offset) 103 .addSort(sortInfos); 104 return query(query); 105 } 106 107 @Override 108 public DocumentModelList query(NxQueryBuilder queryBuilder) { 109 return queryAndAggregate(queryBuilder).getDocuments(); 110 } 111 112 @SuppressWarnings("resource") // IterableQueryResult closed by EsResult.getRows().close() 113 @Override 114 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 115 SearchResponse response = search(queryBuilder); 116 List<Aggregate<Bucket>> aggs = getAggregates(queryBuilder, response); 117 if (queryBuilder.returnsDocuments()) { 118 DocumentModelListImpl docs = getDocumentModels(queryBuilder, response); 119 return new EsResult(docs, aggs, response); 120 } else if (queryBuilder.returnsRows()) { 121 IterableQueryResult rows = getRows(queryBuilder, response); 122 return new EsResult(rows, aggs, response); 123 } 124 return new EsResult(response); 125 } 126 127 @Override 128 public EsScrollResult scroll(NxQueryBuilder queryBuilder, long keepAlive) { 129 return scroll(queryBuilder, SearchType.DFS_QUERY_THEN_FETCH, keepAlive); 130 } 131 132 protected EsScrollResult scroll(NxQueryBuilder queryBuilder, SearchType searchType, long keepAlive) { 133 SearchResponse response = searchScroll(queryBuilder, searchType, keepAlive); 134 return getScrollResults(queryBuilder, response, response.getScrollId(), keepAlive); 135 } 136 137 @Override 138 public EsScrollResult scroll(EsScrollResult scrollResult) { 139 SearchResponse response = nextScroll(scrollResult.getScrollId(), scrollResult.getKeepAlive()); 140 return getScrollResults(scrollResult.getQueryBuilder(), response, response.getScrollId(), 141 scrollResult.getKeepAlive()); 142 } 143 144 @Override 145 public void clearScroll(EsScrollResult scrollResult) { 146 clearScroll(scrollResult.getScrollId()); 147 } 148 149 protected void clearScroll(String scrollId) { 150 if (log.isDebugEnabled()) { 151 log.debug(String.format( 152 "Clear scroll : curl -XDELETE 'http://localhost:9200/_search/scroll' -d '{\"scroll_id\" : [\"%s\"]}'", 153 scrollId)); 154 } 155 ClearScrollRequest request = new ClearScrollRequest(); 156 request.addScrollId(scrollId); 157 esa.getClient().clearScroll(request); 158 } 159 160 @SuppressWarnings("resource") // IterableQueryResult closed by EsResult.getRows().close() 161 protected EsScrollResult getScrollResults(NxQueryBuilder queryBuilder, SearchResponse response, String scrollId, 162 long keepAlive) { 163 if (queryBuilder.returnsDocuments()) { 164 DocumentModelListImpl docs = getDocumentModels(queryBuilder, response); 165 return new EsScrollResult(docs, response, queryBuilder, scrollId, keepAlive); 166 } else if (queryBuilder.returnsRows()) { 167 IterableQueryResult rows = getRows(queryBuilder, response); 168 return new EsScrollResult(rows, response, queryBuilder, scrollId, keepAlive); 169 } 170 return new EsScrollResult(response, queryBuilder, scrollId, keepAlive); 171 } 172 173 protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) { 174 DocumentModelListImpl ret; 175 long totalSize = response.getHits().getTotalHits().value; 176 if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) { 177 ret = new DocumentModelListImpl(0); 178 ret.setTotalSize(totalSize); 179 return ret; 180 } 181 try (Context stopWatch = fetchTimer.time()) { 182 Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap()); 183 ret = fetcher.fetchDocuments(); 184 logMinDurationFetch(stopWatch.stop(), totalSize); 185 } 186 ret.setTotalSize(totalSize); 187 return ret; 188 } 189 190 private void logMinDurationFetch(long duration, long totalSize) { 191 if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) { 192 String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0, 193 totalSize); 194 if (log.isTraceEnabled()) { 195 log.trace(msg, new Throwable("Slow fetch document stack trace")); 196 } else { 197 log.debug(msg); 198 } 199 } 200 } 201 202 protected List<Aggregate<Bucket>> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) { 203 for (AggregateEsBase<Aggregation, Bucket> agg : queryBuilder.getAggregates()) { 204 Filter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg)); 205 if (filter == null) { 206 continue; 207 } 208 Aggregation aggregation = filter.getAggregations().get(agg.getId()); 209 if (aggregation == null) { 210 continue; 211 } 212 agg.parseAggregation(aggregation); 213 } 214 @SuppressWarnings("unchecked") 215 List<Aggregate<Bucket>> ret = (List<Aggregate<Bucket>>) (List<?>) queryBuilder.getAggregates(); 216 return ret; 217 } 218 219 private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) { 220 return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes()); 221 } 222 223 protected SearchResponse search(NxQueryBuilder query) { 224 try (Context ignored = searchTimer.time()) { 225 SearchType searchType = SearchType.DFS_QUERY_THEN_FETCH; 226 SearchRequest request = buildEsSearchRequest(query, searchType); 227 logSearchRequest(request, query, searchType); 228 SearchResponse response = esa.getClient().search(request); 229 logSearchResponse(response); 230 return response; 231 } 232 } 233 234 protected SearchResponse searchScroll(NxQueryBuilder query, SearchType searchType, long keepAlive) { 235 try (Context ignored = searchTimer.time()) { 236 SearchRequest request = buildEsSearchScrollRequest(query, searchType, keepAlive); 237 logSearchRequest(request, query, searchType); 238 SearchResponse response = esa.getClient().search(request); 239 logSearchResponse(response); 240 return response; 241 } 242 } 243 244 protected SearchResponse nextScroll(String scrollId, long keepAlive) { 245 try (Context ignored = scrollTimer.time()) { 246 SearchScrollRequest request = buildEsScrollRequest(scrollId, keepAlive); 247 logScrollRequest(scrollId, keepAlive); 248 SearchResponse response = esa.getClient().searchScroll(request); 249 logSearchResponse(response); 250 return response; 251 } 252 } 253 254 protected SearchRequest buildEsSearchRequest(NxQueryBuilder query, SearchType searchType) { 255 SearchRequest request = new SearchRequest(esa.getSearchIndexes(query.getSearchRepositories())); 256 request.searchType(searchType); 257 SearchSourceBuilder search = new SearchSourceBuilder(); 258 query.updateRequest(search); 259 request.source(search); 260 if (query.isFetchFromElasticsearch()) { 261 // fetch the _source without the binaryfulltext field 262 search.fetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields()); 263 } 264 return request; 265 } 266 267 protected SearchRequest buildEsSearchScrollRequest(NxQueryBuilder query, SearchType searchType, long keepAlive) { 268 SearchRequest request = buildEsSearchRequest(query, searchType); 269 request.scroll(new TimeValue(keepAlive)); 270 return request; 271 } 272 273 protected SearchScrollRequest buildEsScrollRequest(String scrollId, long keepAlive) { 274 return new SearchScrollRequest(scrollId).scroll(new TimeValue(keepAlive)); 275 } 276 277 protected void logSearchResponse(SearchResponse response) { 278 if (log.isDebugEnabled()) { 279 log.debug("Response: " + response.toString()); 280 } 281 } 282 283 protected void logSearchRequest(SearchRequest request, NxQueryBuilder query, SearchType searchType) { 284 if (log.isDebugEnabled()) { 285 String scroll = request.scroll() != null ? "&scroll=" + request.scroll() : ""; 286 log.debug(String.format( 287 "Search query: curl -XGET 'http://localhost:9200/%s/_search?pretty&search_type=%s%s' -d '%s'", 288 getSearchIndexesAsString(query), searchType.toString().toLowerCase(), scroll, 289 request.source().toString())); 290 } 291 } 292 293 protected void logScrollRequest(String scrollId, long keepAlive) { 294 if (log.isDebugEnabled()) { 295 log.debug(String.format( 296 "Scroll search: curl -XGET 'http://localhost:9200/_search/scroll?pretty' -d '{\"scroll\" : \"%d\", \"scroll_id\" : \"%s\"}'", 297 keepAlive, scrollId)); 298 } 299 } 300 301 protected String getSearchIndexesAsString(NxQueryBuilder query) { 302 return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ','); 303 } 304 305}