001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019 020package org.nuxeo.elasticsearch.provider; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.index.query.QueryBuilder; 031import org.nuxeo.ecm.core.api.CoreSession; 032import org.nuxeo.ecm.core.api.DocumentModel; 033import org.nuxeo.ecm.core.api.DocumentModelList; 034import org.nuxeo.ecm.core.api.NuxeoException; 035import org.nuxeo.ecm.core.query.QueryParseException; 036import org.nuxeo.ecm.platform.query.api.Aggregate; 037import org.nuxeo.ecm.platform.query.api.AggregateDefinition; 038import org.nuxeo.ecm.platform.query.api.Bucket; 039import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 040import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 041import org.nuxeo.elasticsearch.aggregate.AggregateFactory; 042import org.nuxeo.elasticsearch.api.ElasticSearchService; 043import org.nuxeo.elasticsearch.api.EsResult; 044import org.nuxeo.elasticsearch.query.NxQueryBuilder; 045import org.nuxeo.elasticsearch.query.NxqlQueryConverter; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.services.config.ConfigurationService; 048 049/** 050 * Elasticsearch Page provider that converts the NXQL query build by CoreQueryDocumentPageProvider. 051 * 052 * @since 5.9.3 053 */ 054public class ElasticSearchNxqlPageProvider extends CoreQueryDocumentPageProvider { 055 056 public static final String CORE_SESSION_PROPERTY = "coreSession"; 057 058 public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories"; 059 060 // @since 9.2 061 public static final String ES_MAX_RESULT_WINDOW_PROPERTY = "org.nuxeo.elasticsearch.provider.maxResultWindow"; 062 063 // This is the default ES index.max_result_window 064 public static final String DEFAULT_ES_MAX_RESULT_WINDOW_VALUE = "10000"; 065 066 protected static final Log log = LogFactory.getLog(ElasticSearchNxqlPageProvider.class); 067 068 private static final long serialVersionUID = 1L; 069 070 protected HashMap<String, Aggregate<? extends Bucket>> currentAggregates; 071 072 protected Long maxResultWindow; 073 074 @Override 075 public List<DocumentModel> getCurrentPage() { 076 077 long t0 = System.currentTimeMillis(); 078 079 // use a cache 080 if (currentPageDocuments != null) { 081 return currentPageDocuments; 082 } 083 error = null; 084 errorMessage = null; 085 if (log.isDebugEnabled()) { 086 log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(), 087 getMinMaxPageSize(), getCurrentPageOffset())); 088 } 089 currentPageDocuments = new ArrayList<>(); 090 CoreSession coreSession = getCoreSession(); 091 if (query == null) { 092 buildQuery(coreSession); 093 } 094 if (query == null) { 095 throw new NuxeoException(String.format("Cannot perform null query: check provider '%s'", getName())); 096 } 097 // Build and execute the ES query 098 ElasticSearchService ess = Framework.getService(ElasticSearchService.class); 099 try { 100 NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).nxql(query) 101 .offset((int) getCurrentPageOffset()) 102 .limit(getLimit()) 103 .addAggregates(buildAggregates()); 104 if (searchOnAllRepositories()) { 105 nxQuery.searchOnAllRepositories(); 106 } 107 108 List<String> highlightFields = getHighlights(); 109 if (highlightFields != null && !highlightFields.isEmpty()) { 110 nxQuery.highlight(highlightFields); 111 } 112 113 EsResult ret = ess.queryAndAggregate(nxQuery); 114 DocumentModelList dmList = ret.getDocuments(); 115 currentAggregates = new HashMap<>(ret.getAggregates().size()); 116 for (Aggregate<Bucket> agg : ret.getAggregates()) { 117 currentAggregates.put(agg.getId(), agg); 118 } 119 setResultsCount(dmList.totalSize()); 120 currentPageDocuments = dmList; 121 } catch (QueryParseException e) { 122 error = e; 123 errorMessage = e.getMessage(); 124 log.warn(e.getMessage(), e); 125 } 126 127 // send event for statistics ! 128 fireSearchEvent(getCoreSession().getPrincipal(), query, currentPageDocuments, System.currentTimeMillis() - t0); 129 130 return currentPageDocuments; 131 } 132 133 protected int getLimit() { 134 int ret = (int) getMinMaxPageSize(); 135 if (ret == 0) { 136 ret = -1; 137 } 138 return ret; 139 } 140 141 public QueryBuilder getCurrentQueryAsEsBuilder() { 142 String nxql = getCurrentQuery(); 143 return NxqlQueryConverter.toESQueryBuilder(nxql); 144 } 145 146 @Override 147 protected void pageChanged() { 148 currentPageDocuments = null; 149 currentAggregates = null; 150 super.pageChanged(); 151 } 152 153 @Override 154 public void refresh() { 155 currentPageDocuments = null; 156 currentAggregates = null; 157 super.refresh(); 158 } 159 160 @Override 161 protected CoreSession getCoreSession() { 162 Map<String, Serializable> props = getProperties(); 163 CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY); 164 if (coreSession == null) { 165 throw new NuxeoException("cannot find core session"); 166 } 167 return coreSession; 168 } 169 170 private List<AggregateEsBase<? extends Bucket>> buildAggregates() { 171 ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size()); 172 boolean skip = isSkipAggregates(); 173 for (AggregateDefinition def : getAggregateDefinitions()) { 174 AggregateEsBase<? extends Bucket> agg = AggregateFactory.create(def, getSearchDocumentModel()); 175 if (!skip || !agg.getSelection().isEmpty()) { 176 // if we want to skip aggregates but one is selected, it has to be computed to filter the result set 177 ret.add(AggregateFactory.create(def, getSearchDocumentModel())); 178 } 179 } 180 return ret; 181 } 182 183 protected boolean searchOnAllRepositories() { 184 String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY); 185 if (value == null) { 186 return false; 187 } 188 return Boolean.parseBoolean(value); 189 } 190 191 @Override 192 public boolean hasAggregateSupport() { 193 return true; 194 } 195 196 @Override 197 public Map<String, Aggregate<? extends Bucket>> getAggregates() { 198 getCurrentPage(); 199 return currentAggregates; 200 } 201 202 /** 203 * Extends the default implementation to add results of aggregates 204 * 205 * @since 7.4 206 */ 207 @Override 208 protected void incorporateAggregates(Map<String, Serializable> eventProps) { 209 210 super.incorporateAggregates(eventProps); 211 if (currentAggregates != null) { 212 HashMap<String, Serializable> aggregateMatches = new HashMap<>(); 213 for (String key : currentAggregates.keySet()) { 214 Aggregate<? extends Bucket> ag = currentAggregates.get(key); 215 ArrayList<HashMap<String, Serializable>> buckets = new ArrayList<>(); 216 for (Bucket bucket : ag.getBuckets()) { 217 HashMap<String, Serializable> b = new HashMap<>(); 218 b.put("key", bucket.getKey()); 219 b.put("count", bucket.getDocCount()); 220 buckets.add(b); 221 } 222 aggregateMatches.put(key, buckets); 223 } 224 eventProps.put("aggregatesMatches", aggregateMatches); 225 } 226 } 227 228 @Override 229 public boolean isLastPageAvailable() { 230 if ((getResultsCount() + getPageSize()) <= getMaxResultWindow()) { 231 return super.isNextPageAvailable(); 232 } 233 return false; 234 } 235 236 @Override 237 public boolean isNextPageAvailable() { 238 if ((getCurrentPageOffset() + 2 * getPageSize()) <= getMaxResultWindow()) { 239 return super.isNextPageAvailable(); 240 } 241 return false; 242 } 243 244 @Override 245 public long getPageLimit() { 246 return getMaxResultWindow() / getPageSize(); 247 } 248 249 /** 250 * Returns the max result window where the PP can navigate without raising Elasticsearch 251 * QueryPhaseExecutionException. {@code from + size} must be less than or equal to this value. 252 * 253 * @since 9.2 254 */ 255 public long getMaxResultWindow() { 256 if (maxResultWindow == null) { 257 ConfigurationService cs = Framework.getService(ConfigurationService.class); 258 String maxResultWindowStr = cs.getProperty(ES_MAX_RESULT_WINDOW_PROPERTY, 259 DEFAULT_ES_MAX_RESULT_WINDOW_VALUE); 260 try { 261 maxResultWindow = Long.valueOf(maxResultWindowStr); 262 } catch (NumberFormatException e) { 263 log.warn(String.format( 264 "Invalid maxResultWindow property value: %s for page provider: %s, fallback to default.", 265 maxResultWindowStr, getName())); 266 maxResultWindow = Long.valueOf(DEFAULT_ES_MAX_RESULT_WINDOW_VALUE); 267 } 268 } 269 return maxResultWindow; 270 } 271 272 @Override 273 public long getResultsCountLimit() { 274 return getMaxResultWindow(); 275 } 276 277 /** 278 * Set the max result window where the PP can navigate, for testing purpose. 279 * 280 * @since 9.2 281 */ 282 public void setMaxResultWindow(long maxResultWindow) { 283 this.maxResultWindow = maxResultWindow; 284 } 285 286}