001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019 020package org.nuxeo.elasticsearch.provider; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.index.query.QueryBuilder; 031import org.nuxeo.ecm.core.api.CoreSession; 032import org.nuxeo.ecm.core.api.DocumentModel; 033import org.nuxeo.ecm.core.api.DocumentModelList; 034import org.nuxeo.ecm.core.api.NuxeoException; 035import org.nuxeo.ecm.core.query.QueryParseException; 036import org.nuxeo.ecm.platform.query.api.Aggregate; 037import org.nuxeo.ecm.platform.query.api.AggregateDefinition; 038import org.nuxeo.ecm.platform.query.api.Bucket; 039import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 040import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 041import org.nuxeo.elasticsearch.aggregate.AggregateFactory; 042import org.nuxeo.elasticsearch.api.ElasticSearchService; 043import org.nuxeo.elasticsearch.api.EsResult; 044import org.nuxeo.elasticsearch.query.NxQueryBuilder; 045import org.nuxeo.elasticsearch.query.NxqlQueryConverter; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.services.config.ConfigurationService; 048 049/** 050 * Elasticsearch Page provider that converts the NXQL query build by CoreQueryDocumentPageProvider. 051 * 052 * @since 5.9.3 053 */ 054public class ElasticSearchNxqlPageProvider extends CoreQueryDocumentPageProvider { 055 056 protected static final Log log = LogFactory.getLog(ElasticSearchNxqlPageProvider.class); 057 058 private static final long serialVersionUID = 1L; 059 060 public static final String CORE_SESSION_PROPERTY = "coreSession"; 061 062 public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories"; 063 064 // @since 9.2 065 public static final String ES_MAX_RESULT_WINDOW_PROPERTY = "org.nuxeo.elasticsearch.provider.maxResultWindow"; 066 067 // This is the default ES index.max_result_window 068 public static final String DEFAULT_ES_MAX_RESULT_WINDOW_VALUE = "10000"; 069 070 protected List<DocumentModel> currentPageDocuments; 071 072 protected HashMap<String, Aggregate<? extends Bucket>> currentAggregates; 073 074 protected Long maxResultWindow; 075 076 @Override 077 public List<DocumentModel> getCurrentPage() { 078 079 long t0 = System.currentTimeMillis(); 080 081 // use a cache 082 if (currentPageDocuments != null) { 083 return currentPageDocuments; 084 } 085 error = null; 086 errorMessage = null; 087 if (log.isDebugEnabled()) { 088 log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(), 089 getMinMaxPageSize(), getCurrentPageOffset())); 090 } 091 currentPageDocuments = new ArrayList<>(); 092 CoreSession coreSession = getCoreSession(); 093 if (query == null) { 094 buildQuery(coreSession); 095 } 096 if (query == null) { 097 throw new NuxeoException(String.format("Cannot perform null query: check provider '%s'", getName())); 098 } 099 // Build and execute the ES query 100 ElasticSearchService ess = Framework.getLocalService(ElasticSearchService.class); 101 try { 102 NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).nxql(query) 103 .offset((int) getCurrentPageOffset()) 104 .limit(getLimit()) 105 .addAggregates(buildAggregates()); 106 if (searchOnAllRepositories()) { 107 nxQuery.searchOnAllRepositories(); 108 } 109 110 List<String> highlightFields = getHighlights(); 111 if (highlightFields != null && !highlightFields.isEmpty()) { 112 nxQuery.highlight(highlightFields); 113 } 114 115 EsResult ret = ess.queryAndAggregate(nxQuery); 116 DocumentModelList dmList = ret.getDocuments(); 117 currentAggregates = new HashMap<>(ret.getAggregates().size()); 118 for (Aggregate<Bucket> agg : ret.getAggregates()) { 119 currentAggregates.put(agg.getId(), agg); 120 } 121 setResultsCount(dmList.totalSize()); 122 currentPageDocuments = dmList; 123 } catch (QueryParseException e) { 124 error = e; 125 errorMessage = e.getMessage(); 126 log.warn(e.getMessage(), e); 127 } 128 129 // send event for statistics ! 130 fireSearchEvent(getCoreSession().getPrincipal(), query, currentPageDocuments, System.currentTimeMillis() - t0); 131 132 return currentPageDocuments; 133 } 134 135 protected int getLimit() { 136 int ret = (int) getMinMaxPageSize(); 137 if (ret == 0) { 138 ret = -1; 139 } 140 return ret; 141 } 142 143 public QueryBuilder getCurrentQueryAsEsBuilder() { 144 String nxql = getCurrentQuery(); 145 return NxqlQueryConverter.toESQueryBuilder(nxql); 146 } 147 148 @Override 149 protected void pageChanged() { 150 currentPageDocuments = null; 151 currentAggregates = null; 152 super.pageChanged(); 153 } 154 155 @Override 156 public void refresh() { 157 currentPageDocuments = null; 158 currentAggregates = null; 159 super.refresh(); 160 } 161 162 @Override 163 protected CoreSession getCoreSession() { 164 Map<String, Serializable> props = getProperties(); 165 CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY); 166 if (coreSession == null) { 167 throw new NuxeoException("cannot find core session"); 168 } 169 return coreSession; 170 } 171 172 private List<AggregateEsBase<? extends Bucket>> buildAggregates() { 173 ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size()); 174 for (AggregateDefinition def : getAggregateDefinitions()) { 175 ret.add(AggregateFactory.create(def, getSearchDocumentModel())); 176 } 177 return ret; 178 } 179 180 protected boolean searchOnAllRepositories() { 181 String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY); 182 if (value == null) { 183 return false; 184 } 185 return Boolean.parseBoolean(value); 186 } 187 188 @Override 189 public boolean hasAggregateSupport() { 190 return true; 191 } 192 193 @Override 194 public Map<String, Aggregate<? extends Bucket>> getAggregates() { 195 getCurrentPage(); 196 return currentAggregates; 197 } 198 199 /** 200 * Extends the default implementation to add results of aggregates 201 * 202 * @since 7.4 203 */ 204 @Override 205 protected void incorporateAggregates(Map<String, Serializable> eventProps) { 206 207 super.incorporateAggregates(eventProps); 208 if (currentAggregates != null) { 209 HashMap<String, Serializable> aggregateMatches = new HashMap<>(); 210 for (String key : currentAggregates.keySet()) { 211 Aggregate<? extends Bucket> ag = currentAggregates.get(key); 212 ArrayList<HashMap<String, Serializable>> buckets = new ArrayList<>(); 213 for (Bucket bucket : ag.getBuckets()) { 214 HashMap<String, Serializable> b = new HashMap<>(); 215 b.put("key", bucket.getKey()); 216 b.put("count", bucket.getDocCount()); 217 buckets.add(b); 218 } 219 aggregateMatches.put(key, buckets); 220 } 221 eventProps.put("aggregatesMatches", aggregateMatches); 222 } 223 } 224 225 @Override 226 public boolean isLastPageAvailable() { 227 if ((getResultsCount() + getPageSize()) < getMaxResultWindow()) { 228 return super.isNextPageAvailable(); 229 } 230 return false; 231 } 232 233 @Override 234 public boolean isNextPageAvailable() { 235 if ((getCurrentPageOffset() + 2 * getPageSize()) < getMaxResultWindow()) { 236 return super.isNextPageAvailable(); 237 } 238 return false; 239 } 240 241 @Override 242 public long getPageLimit() { 243 return getMaxResultWindow() / getPageSize(); 244 } 245 246 /** 247 * Returns the max result window where the PP can navigate without raising Elasticsearch 248 * QueryPhaseExecutionException. {@code from + size} must be less than or equal to this value. 249 * 250 * @since 9.2 251 */ 252 public long getMaxResultWindow() { 253 if (maxResultWindow == null) { 254 ConfigurationService cs = Framework.getService(ConfigurationService.class); 255 String maxResultWindowStr = cs.getProperty(ES_MAX_RESULT_WINDOW_PROPERTY, 256 DEFAULT_ES_MAX_RESULT_WINDOW_VALUE); 257 try { 258 maxResultWindow = Long.valueOf(maxResultWindowStr); 259 } catch (NumberFormatException e) { 260 log.warn(String.format( 261 "Invalid maxResultWindow property value: %s for page provider: %s, fallback to default.", 262 maxResultWindow, getName())); 263 maxResultWindow = Long.valueOf(DEFAULT_ES_MAX_RESULT_WINDOW_VALUE); 264 } 265 } 266 return maxResultWindow; 267 } 268 269 /** 270 * Set the max result window where the PP can navigate, for testing purpose. 271 * 272 * @since 9.2 273 */ 274 public void setMaxResultWindow(long maxResultWindow) { 275 this.maxResultWindow = maxResultWindow; 276 } 277 278}