001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019 020package org.nuxeo.elasticsearch.provider; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.index.query.QueryBuilder; 031import org.nuxeo.ecm.core.api.CoreSession; 032import org.nuxeo.ecm.core.api.DocumentModel; 033import org.nuxeo.ecm.core.api.DocumentModelList; 034import org.nuxeo.ecm.core.api.NuxeoException; 035import org.nuxeo.ecm.core.query.QueryParseException; 036import org.nuxeo.ecm.platform.query.api.Aggregate; 037import org.nuxeo.ecm.platform.query.api.AggregateDefinition; 038import org.nuxeo.ecm.platform.query.api.Bucket; 039import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 040import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 041import org.nuxeo.elasticsearch.aggregate.AggregateFactory; 042import org.nuxeo.elasticsearch.api.ElasticSearchService; 043import org.nuxeo.elasticsearch.api.EsResult; 044import org.nuxeo.elasticsearch.query.NxQueryBuilder; 045import org.nuxeo.elasticsearch.query.NxqlQueryConverter; 046import org.nuxeo.runtime.api.Framework; 047 048/** 049 * Elasticsearch Page provider that converts the NXQL query build by CoreQueryDocumentPageProvider. 050 * 051 * @since 5.9.3 052 */ 053public class ElasticSearchNxqlPageProvider extends CoreQueryDocumentPageProvider { 054 055 public static final String CORE_SESSION_PROPERTY = "coreSession"; 056 057 public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories"; 058 059 protected static final Log log = LogFactory.getLog(ElasticSearchNxqlPageProvider.class); 060 061 private static final long serialVersionUID = 1L; 062 063 protected List<DocumentModel> currentPageDocuments; 064 065 protected HashMap<String, Aggregate<? extends Bucket>> currentAggregates; 066 067 @Override 068 public List<DocumentModel> getCurrentPage() { 069 070 long t0 = System.currentTimeMillis(); 071 072 // use a cache 073 if (currentPageDocuments != null) { 074 return currentPageDocuments; 075 } 076 error = null; 077 errorMessage = null; 078 if (log.isDebugEnabled()) { 079 log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(), 080 getMinMaxPageSize(), getCurrentPageOffset())); 081 } 082 currentPageDocuments = new ArrayList<>(); 083 CoreSession coreSession = getCoreSession(); 084 if (query == null) { 085 buildQuery(coreSession); 086 } 087 if (query == null) { 088 throw new NuxeoException(String.format("Cannot perform null query: check provider '%s'", getName())); 089 } 090 // Build and execute the ES query 091 ElasticSearchService ess = Framework.getLocalService(ElasticSearchService.class); 092 try { 093 NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).nxql(query) 094 .offset((int) getCurrentPageOffset()) 095 .limit(getLimit()) 096 .addAggregates(buildAggregates()); 097 if (searchOnAllRepositories()) { 098 nxQuery.searchOnAllRepositories(); 099 } 100 101 List<String> highlightFields = getHighlights(); 102 if (highlightFields != null && !highlightFields.isEmpty()) { 103 nxQuery.highlight(highlightFields); 104 } 105 106 EsResult ret = ess.queryAndAggregate(nxQuery); 107 DocumentModelList dmList = ret.getDocuments(); 108 currentAggregates = new HashMap<>(ret.getAggregates().size()); 109 for (Aggregate<Bucket> agg : ret.getAggregates()) { 110 currentAggregates.put(agg.getId(), agg); 111 } 112 setResultsCount(dmList.totalSize()); 113 currentPageDocuments = dmList; 114 } catch (QueryParseException e) { 115 error = e; 116 errorMessage = e.getMessage(); 117 log.warn(e.getMessage(), e); 118 } 119 120 // send event for statistics ! 121 fireSearchEvent(getCoreSession().getPrincipal(), query, currentPageDocuments, System.currentTimeMillis() - t0); 122 123 return currentPageDocuments; 124 } 125 126 protected int getLimit() { 127 int ret = (int) getMinMaxPageSize(); 128 if (ret == 0) { 129 ret = -1; 130 } 131 return ret; 132 } 133 134 public QueryBuilder getCurrentQueryAsEsBuilder() { 135 String nxql = getCurrentQuery(); 136 return NxqlQueryConverter.toESQueryBuilder(nxql); 137 } 138 139 @Override 140 protected void pageChanged() { 141 currentPageDocuments = null; 142 currentAggregates = null; 143 super.pageChanged(); 144 } 145 146 @Override 147 public void refresh() { 148 currentPageDocuments = null; 149 currentAggregates = null; 150 super.refresh(); 151 } 152 153 @Override 154 protected CoreSession getCoreSession() { 155 Map<String, Serializable> props = getProperties(); 156 CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY); 157 if (coreSession == null) { 158 throw new NuxeoException("cannot find core session"); 159 } 160 return coreSession; 161 } 162 163 private List<AggregateEsBase<? extends Bucket>> buildAggregates() { 164 ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size()); 165 for (AggregateDefinition def : getAggregateDefinitions()) { 166 ret.add(AggregateFactory.create(def, getSearchDocumentModel())); 167 } 168 return ret; 169 } 170 171 protected boolean searchOnAllRepositories() { 172 String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY); 173 if (value == null) { 174 return false; 175 } 176 return Boolean.parseBoolean(value); 177 } 178 179 @Override 180 public boolean hasAggregateSupport() { 181 return true; 182 } 183 184 @Override 185 public Map<String, Aggregate<? extends Bucket>> getAggregates() { 186 getCurrentPage(); 187 return currentAggregates; 188 } 189 190 /** 191 * Extends the default implementation to add results of aggregates 192 * 193 * @since 7.4 194 */ 195 @Override 196 protected void incorporateAggregates(Map<String, Serializable> eventProps) { 197 198 super.incorporateAggregates(eventProps); 199 if (currentAggregates != null) { 200 HashMap<String, Serializable> aggregateMatches = new HashMap<>(); 201 for (String key : currentAggregates.keySet()) { 202 Aggregate<? extends Bucket> ag = currentAggregates.get(key); 203 ArrayList<HashMap<String, Serializable>> buckets = new ArrayList<>(); 204 for (Bucket bucket : ag.getBuckets()) { 205 HashMap<String, Serializable> b = new HashMap<>(); 206 b.put("key", bucket.getKey()); 207 b.put("count", bucket.getDocCount()); 208 buckets.add(b); 209 } 210 aggregateMatches.put(key, buckets); 211 } 212 eventProps.put("aggregatesMatches", aggregateMatches); 213 } 214 } 215}