001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019 020package org.nuxeo.elasticsearch.provider; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import org.apache.logging.log4j.LogManager; 029import org.apache.logging.log4j.Logger; 030import org.elasticsearch.index.query.QueryBuilder; 031import org.elasticsearch.search.aggregations.Aggregation; 032import org.nuxeo.ecm.core.api.CoreSession; 033import org.nuxeo.ecm.core.api.DocumentModel; 034import org.nuxeo.ecm.core.api.DocumentModelList; 035import org.nuxeo.ecm.core.api.NuxeoException; 036import org.nuxeo.ecm.core.query.QueryParseException; 037import org.nuxeo.ecm.platform.query.api.Aggregate; 038import org.nuxeo.ecm.platform.query.api.AggregateDefinition; 039import org.nuxeo.ecm.platform.query.api.Bucket; 040import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 041import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 042import org.nuxeo.elasticsearch.aggregate.AggregateFactory; 043import org.nuxeo.elasticsearch.api.ElasticSearchService; 044import org.nuxeo.elasticsearch.api.EsResult; 045import org.nuxeo.elasticsearch.query.NxQueryBuilder; 046import org.nuxeo.elasticsearch.query.NxqlQueryConverter; 047import org.nuxeo.runtime.api.Framework; 048import org.nuxeo.runtime.services.config.ConfigurationService; 049 050/** 051 * Elasticsearch Page provider that converts the NXQL query build by CoreQueryDocumentPageProvider. 052 * 053 * @since 5.9.3 054 */ 055public class ElasticSearchNxqlPageProvider extends CoreQueryDocumentPageProvider { 056 057 public static final String CORE_SESSION_PROPERTY = "coreSession"; 058 059 public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories"; 060 061 // @since 9.2 062 public static final String ES_MAX_RESULT_WINDOW_PROPERTY = "org.nuxeo.elasticsearch.provider.maxResultWindow"; 063 064 // This is the default ES index.max_result_window 065 public static final String DEFAULT_ES_MAX_RESULT_WINDOW_VALUE = "10000"; 066 067 protected static final Logger log = LogManager.getLogger(ElasticSearchNxqlPageProvider.class); 068 069 private static final long serialVersionUID = 1L; 070 071 protected HashMap<String, Aggregate<? extends Bucket>> currentAggregates; 072 073 protected Long maxResultWindow; 074 075 @Override 076 public List<DocumentModel> getCurrentPage() { 077 078 long t0 = System.currentTimeMillis(); 079 080 // use a cache 081 if (currentPageDocuments != null) { 082 return currentPageDocuments; 083 } 084 error = null; 085 errorMessage = null; 086 log.debug("Perform query for provider '{}': with pageSize={}, offset={}", this::getName, 087 this::getMinMaxPageSize, this::getCurrentPageOffset); 088 currentPageDocuments = new ArrayList<>(); 089 CoreSession coreSession = getCoreSession(); 090 if (query == null) { 091 buildQuery(coreSession); 092 } 093 if (query == null) { 094 throw new NuxeoException(String.format("Cannot perform null query: check provider '%s'", getName())); 095 } 096 // Build and execute the ES query 097 ElasticSearchService ess = Framework.getService(ElasticSearchService.class); 098 try { 099 NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).nxql(query) 100 .offset((int) getCurrentPageOffset()) 101 .limit(getLimit()) 102 .addAggregates(buildAggregates()); 103 if (searchOnAllRepositories()) { 104 nxQuery.searchOnAllRepositories(); 105 } 106 107 List<String> highlightFields = getHighlights(); 108 if (highlightFields != null && !highlightFields.isEmpty()) { 109 nxQuery.highlight(highlightFields); 110 } 111 112 EsResult ret = ess.queryAndAggregate(nxQuery); 113 DocumentModelList dmList = ret.getDocuments(); 114 currentAggregates = new HashMap<>(ret.getAggregates().size()); 115 for (Aggregate<Bucket> agg : ret.getAggregates()) { 116 currentAggregates.put(agg.getId(), agg); 117 } 118 setResultsCount(dmList.totalSize()); 119 currentPageDocuments = dmList; 120 } catch (QueryParseException e) { 121 error = e; 122 errorMessage = e.getMessage(); 123 log.warn(e.getMessage(), e); 124 } 125 126 // send event for statistics ! 127 fireSearchEvent(getCoreSession().getPrincipal(), query, currentPageDocuments, System.currentTimeMillis() - t0); 128 129 return currentPageDocuments; 130 } 131 132 protected int getLimit() { 133 int ret = (int) getMinMaxPageSize(); 134 if (ret == 0) { 135 ret = (int) Long.min(getMaxResultWindow(), Integer.MAX_VALUE); 136 } 137 return ret; 138 } 139 140 public QueryBuilder getCurrentQueryAsEsBuilder() { 141 String nxql = getCurrentQuery(); 142 return NxqlQueryConverter.toESQueryBuilder(nxql); 143 } 144 145 @Override 146 protected void pageChanged() { 147 currentPageDocuments = null; 148 currentAggregates = null; 149 super.pageChanged(); 150 } 151 152 @Override 153 public void refresh() { 154 currentPageDocuments = null; 155 currentAggregates = null; 156 super.refresh(); 157 } 158 159 @Override 160 protected CoreSession getCoreSession() { 161 Map<String, Serializable> props = getProperties(); 162 CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY); 163 if (coreSession == null) { 164 throw new NuxeoException("cannot find core session"); 165 } 166 return coreSession; 167 } 168 169 private List<AggregateEsBase<? extends Aggregation, ? extends Bucket>> buildAggregates() { 170 ArrayList<AggregateEsBase<? extends Aggregation, ? extends Bucket>> ret = new ArrayList<>( 171 getAggregateDefinitions().size()); 172 boolean skip = isSkipAggregates(); 173 for (AggregateDefinition def : getAggregateDefinitions()) { 174 AggregateEsBase<? extends Aggregation, ? extends Bucket> agg = AggregateFactory.create(def, 175 getSearchDocumentModel()); 176 if (!skip || !agg.getSelection().isEmpty()) { 177 // if we want to skip aggregates but one is selected, it has to be computed to filter the result set 178 ret.add(AggregateFactory.create(def, getSearchDocumentModel())); 179 } 180 } 181 return ret; 182 } 183 184 protected boolean searchOnAllRepositories() { 185 String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY); 186 if (value == null) { 187 return false; 188 } 189 return Boolean.parseBoolean(value); 190 } 191 192 @Override 193 public boolean hasAggregateSupport() { 194 return true; 195 } 196 197 @Override 198 public Map<String, Aggregate<? extends Bucket>> getAggregates() { 199 getCurrentPage(); 200 return currentAggregates; 201 } 202 203 /** 204 * Extends the default implementation to add results of aggregates 205 * 206 * @since 7.4 207 */ 208 @Override 209 protected void incorporateAggregates(Map<String, Serializable> eventProps) { 210 211 super.incorporateAggregates(eventProps); 212 if (currentAggregates != null) { 213 HashMap<String, Serializable> aggregateMatches = new HashMap<>(); 214 for (String key : currentAggregates.keySet()) { 215 Aggregate<? extends Bucket> ag = currentAggregates.get(key); 216 ArrayList<HashMap<String, Serializable>> buckets = new ArrayList<>(); 217 for (Bucket bucket : ag.getBuckets()) { 218 HashMap<String, Serializable> b = new HashMap<>(); 219 b.put("key", bucket.getKey()); 220 b.put("count", bucket.getDocCount()); 221 buckets.add(b); 222 } 223 aggregateMatches.put(key, buckets); 224 } 225 eventProps.put("aggregatesMatches", aggregateMatches); 226 } 227 } 228 229 @Override 230 public boolean isLastPageAvailable() { 231 if ((getResultsCount() + getPageSize()) <= getMaxResultWindow()) { 232 return super.isNextPageAvailable(); 233 } 234 return false; 235 } 236 237 @Override 238 public boolean isNextPageAvailable() { 239 if ((getCurrentPageOffset() + 2 * getPageSize()) <= getMaxResultWindow()) { 240 return super.isNextPageAvailable(); 241 } 242 return false; 243 } 244 245 @Override 246 public long getPageLimit() { 247 return getMaxResultWindow() / getPageSize(); 248 } 249 250 /** 251 * Returns the max result window where the PP can navigate without raising Elasticsearch 252 * QueryPhaseExecutionException. {@code from + size} must be less than or equal to this value. 253 * 254 * @since 9.2 255 */ 256 public long getMaxResultWindow() { 257 if (maxResultWindow == null) { 258 ConfigurationService cs = Framework.getService(ConfigurationService.class); 259 String maxResultWindowStr = cs.getProperty(ES_MAX_RESULT_WINDOW_PROPERTY, 260 DEFAULT_ES_MAX_RESULT_WINDOW_VALUE); 261 try { 262 maxResultWindow = Long.valueOf(maxResultWindowStr); 263 } catch (NumberFormatException e) { 264 log.warn("Invalid maxResultWindow property value: %s for page provider: %s, fallback to default.", 265 maxResultWindowStr, name); 266 maxResultWindow = Long.valueOf(DEFAULT_ES_MAX_RESULT_WINDOW_VALUE); 267 } 268 } 269 return maxResultWindow; 270 } 271 272 @Override 273 public long getResultsCountLimit() { 274 return getMaxResultWindow(); 275 } 276 277 /** 278 * Set the max result window where the PP can navigate, for testing purpose. 279 * 280 * @since 9.2 281 */ 282 public void setMaxResultWindow(long maxResultWindow) { 283 this.maxResultWindow = maxResultWindow; 284 } 285 286}