001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * bdelbosc 019 */ 020 021package org.nuxeo.elasticsearch.core; 022 023import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 024 025import java.util.List; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.action.search.SearchRequestBuilder; 031import org.elasticsearch.action.search.SearchResponse; 032import org.elasticsearch.action.search.SearchType; 033import org.elasticsearch.index.query.QueryBuilder; 034import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; 035import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentModelList; 038import org.nuxeo.ecm.core.api.IterableQueryResult; 039import org.nuxeo.ecm.core.api.SortInfo; 040import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl; 041import org.nuxeo.ecm.platform.query.api.Aggregate; 042import org.nuxeo.ecm.platform.query.api.Bucket; 043import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 044import org.nuxeo.elasticsearch.api.ElasticSearchService; 045import org.nuxeo.elasticsearch.api.EsResult; 046import org.nuxeo.elasticsearch.fetcher.Fetcher; 047import org.nuxeo.elasticsearch.query.NxQueryBuilder; 048import org.nuxeo.runtime.api.Framework; 049import org.nuxeo.runtime.metrics.MetricsService; 050 051import com.codahale.metrics.MetricRegistry; 052import com.codahale.metrics.SharedMetricRegistries; 053import com.codahale.metrics.Timer; 054import com.codahale.metrics.Timer.Context; 055 056/** 057 * @since 6.0 058 */ 059public class ElasticSearchServiceImpl implements ElasticSearchService { 060 private static final Log log = LogFactory.getLog(ElasticSearchServiceImpl.class); 061 062 private static final java.lang.String LOG_MIN_DURATION_FETCH_KEY = "org.nuxeo.elasticsearch.core.log_min_duration_fetch_ms"; 063 064 private static final long LOG_MIN_DURATION_FETCH_NS = Long.parseLong(Framework.getProperty( 065 LOG_MIN_DURATION_FETCH_KEY, "200")) * 1000000; 066 067 // Metrics 068 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 069 070 protected final Timer searchTimer; 071 072 protected final Timer fetchTimer; 073 074 private final ElasticSearchAdminImpl esa; 075 076 public ElasticSearchServiceImpl(ElasticSearchAdminImpl esa) { 077 this.esa = esa; 078 searchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "search")); 079 fetchTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "fetch")); 080 } 081 082 @Deprecated 083 @Override 084 public DocumentModelList query(CoreSession session, String nxql, int limit, int offset, SortInfo... sortInfos) 085 { 086 NxQueryBuilder query = new NxQueryBuilder(session).nxql(nxql).limit(limit).offset(offset).addSort(sortInfos); 087 return query(query); 088 } 089 090 @Deprecated 091 @Override 092 public DocumentModelList query(CoreSession session, QueryBuilder queryBuilder, int limit, int offset, 093 SortInfo... sortInfos) { 094 NxQueryBuilder query = new NxQueryBuilder(session).esQuery(queryBuilder).limit(limit).offset(offset).addSort( 095 sortInfos); 096 return query(query); 097 } 098 099 @Override 100 public DocumentModelList query(NxQueryBuilder queryBuilder) { 101 return queryAndAggregate(queryBuilder).getDocuments(); 102 } 103 104 @Override 105 public EsResult queryAndAggregate(NxQueryBuilder queryBuilder) { 106 SearchResponse response = search(queryBuilder); 107 List<Aggregate> aggs = getAggregates(queryBuilder, response); 108 if (queryBuilder.returnsDocuments()) { 109 DocumentModelListImpl docs = getDocumentModels(queryBuilder, response); 110 return new EsResult(docs, aggs, response); 111 } else if (queryBuilder.returnsRows()) { 112 IterableQueryResult rows = getRows(queryBuilder, response); 113 return new EsResult(rows, aggs, response); 114 } 115 return new EsResult(response); 116 } 117 118 protected DocumentModelListImpl getDocumentModels(NxQueryBuilder queryBuilder, SearchResponse response) { 119 DocumentModelListImpl ret; 120 long totalSize = response.getHits().getTotalHits(); 121 if (!queryBuilder.returnsDocuments() || response.getHits().getHits().length == 0) { 122 ret = new DocumentModelListImpl(0); 123 ret.setTotalSize(totalSize); 124 return ret; 125 } 126 Context stopWatch = fetchTimer.time(); 127 Fetcher fetcher = queryBuilder.getFetcher(response, esa.getRepositoryMap()); 128 try { 129 ret = fetcher.fetchDocuments(); 130 } finally { 131 logMinDurationFetch(stopWatch.stop(), totalSize); 132 } 133 ret.setTotalSize(totalSize); 134 return ret; 135 } 136 137 private void logMinDurationFetch(long duration, long totalSize) { 138 if (log.isDebugEnabled() && (duration > LOG_MIN_DURATION_FETCH_NS)) { 139 String msg = String.format("Slow fetch duration_ms:\t%.2f\treturning:\t%d documents", duration / 1000000.0, 140 totalSize); 141 if (log.isTraceEnabled()) { 142 log.trace(msg, new Throwable("Slow fetch document stack trace")); 143 } else { 144 log.debug(msg); 145 } 146 } 147 } 148 149 protected List<Aggregate> getAggregates(NxQueryBuilder queryBuilder, SearchResponse response) { 150 for (AggregateEsBase<? extends Bucket> agg : queryBuilder.getAggregates()) { 151 InternalFilter filter = response.getAggregations().get(NxQueryBuilder.getAggregateFilterId(agg)); 152 if (filter == null) { 153 continue; 154 } 155 MultiBucketsAggregation mba = filter.getAggregations().get(agg.getId()); 156 if (mba == null) { 157 continue; 158 } 159 agg.parseEsBuckets(mba.getBuckets()); 160 } 161 @SuppressWarnings("unchecked") 162 List<Aggregate> ret = (List<Aggregate>) (List<?>) queryBuilder.getAggregates(); 163 return ret; 164 } 165 166 private IterableQueryResult getRows(NxQueryBuilder queryBuilder, SearchResponse response) { 167 return new EsResultSetImpl(response, queryBuilder.getSelectFieldsAndTypes()); 168 } 169 170 protected SearchResponse search(NxQueryBuilder query) { 171 Context stopWatch = searchTimer.time(); 172 try { 173 SearchRequestBuilder request = buildEsSearchRequest(query); 174 logSearchRequest(request, query); 175 SearchResponse response = request.execute().actionGet(); 176 logSearchResponse(response); 177 return response; 178 } finally { 179 stopWatch.stop(); 180 } 181 } 182 183 protected SearchRequestBuilder buildEsSearchRequest(NxQueryBuilder query) { 184 SearchRequestBuilder request = esa.getClient().prepareSearch( 185 esa.getSearchIndexes(query.getSearchRepositories())).setTypes(DOC_TYPE).setSearchType( 186 SearchType.DFS_QUERY_THEN_FETCH); 187 query.updateRequest(request); 188 if (query.isFetchFromElasticsearch()) { 189 // fetch the _source without the binaryfulltext field 190 request.setFetchSource(esa.getIncludeSourceFields(), esa.getExcludeSourceFields()); 191 } 192 return request; 193 } 194 195 protected void logSearchResponse(SearchResponse response) { 196 if (log.isDebugEnabled()) { 197 log.debug("Response: " + response.toString()); 198 } 199 } 200 201 protected void logSearchRequest(SearchRequestBuilder request, NxQueryBuilder query) { 202 if (log.isDebugEnabled()) { 203 log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 204 getSearchIndexesAsString(query), DOC_TYPE, request.toString())); 205 } 206 } 207 208 protected String getSearchIndexesAsString(NxQueryBuilder query) { 209 return StringUtils.join(esa.getSearchIndexes(query.getSearchRepositories()), ','); 210 } 211 212}