001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * bdelbosc 016 */ 017 018package org.nuxeo.elasticsearch.provider; 019 020import java.io.Serializable; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.index.query.QueryBuilder; 029import org.nuxeo.ecm.core.api.CoreSession; 030import org.nuxeo.ecm.core.api.DocumentModel; 031import org.nuxeo.ecm.core.api.DocumentModelList; 032import org.nuxeo.ecm.core.api.NuxeoException; 033import org.nuxeo.ecm.core.query.QueryParseException; 034import org.nuxeo.ecm.platform.query.api.Aggregate; 035import org.nuxeo.ecm.platform.query.api.AggregateDefinition; 036import org.nuxeo.ecm.platform.query.api.Bucket; 037import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 038import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 039import org.nuxeo.elasticsearch.aggregate.AggregateFactory; 040import org.nuxeo.elasticsearch.api.ElasticSearchService; 041import org.nuxeo.elasticsearch.api.EsResult; 042import org.nuxeo.elasticsearch.query.NxQueryBuilder; 043import org.nuxeo.elasticsearch.query.NxqlQueryConverter; 044import org.nuxeo.runtime.api.Framework; 045 046/** 047 * Elasticsearch Page provider that converts the NXQL query build by CoreQueryDocumentPageProvider. 048 * 049 * @since 5.9.3 050 */ 051public class ElasticSearchNxqlPageProvider extends CoreQueryDocumentPageProvider { 052 053 public static final String CORE_SESSION_PROPERTY = "coreSession"; 054 055 public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories"; 056 057 protected static final Log log = LogFactory.getLog(ElasticSearchNxqlPageProvider.class); 058 059 private static final long serialVersionUID = 1L; 060 061 protected List<DocumentModel> currentPageDocuments; 062 063 protected HashMap<String, Aggregate<? extends Bucket>> currentAggregates; 064 065 @Override 066 public List<DocumentModel> getCurrentPage() { 067 068 long t0 = System.currentTimeMillis(); 069 070 // use a cache 071 if (currentPageDocuments != null) { 072 return currentPageDocuments; 073 } 074 error = null; 075 errorMessage = null; 076 if (log.isDebugEnabled()) { 077 log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(), 078 getMinMaxPageSize(), getCurrentPageOffset())); 079 } 080 currentPageDocuments = new ArrayList<DocumentModel>(); 081 CoreSession coreSession = getCoreSession(); 082 if (query == null) { 083 buildQuery(coreSession); 084 } 085 if (query == null) { 086 throw new NuxeoException(String.format("Cannot perform null query: check provider '%s'", getName())); 087 } 088 // Build and execute the ES query 089 ElasticSearchService ess = Framework.getLocalService(ElasticSearchService.class); 090 try { 091 NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).nxql(query).offset( 092 (int) getCurrentPageOffset()).limit(getLimit()).addAggregates(buildAggregates()); 093 if (searchOnAllRepositories()) { 094 nxQuery.searchOnAllRepositories(); 095 } 096 EsResult ret = ess.queryAndAggregate(nxQuery); 097 DocumentModelList dmList = ret.getDocuments(); 098 currentAggregates = new HashMap<>(ret.getAggregates().size()); 099 for (Aggregate<Bucket> agg : ret.getAggregates()) { 100 currentAggregates.put(agg.getId(), agg); 101 } 102 setResultsCount(dmList.totalSize()); 103 currentPageDocuments = dmList; 104 } catch (QueryParseException e) { 105 error = e; 106 errorMessage = e.getMessage(); 107 log.warn(e.getMessage(), e); 108 } 109 110 // send event for statistics ! 111 fireSearchEvent(getCoreSession().getPrincipal(), query, currentPageDocuments, System.currentTimeMillis() - t0); 112 113 return currentPageDocuments; 114 } 115 116 protected int getLimit() { 117 int ret = (int) getMinMaxPageSize(); 118 if (ret == 0) { 119 ret = -1; 120 } 121 return ret; 122 } 123 124 public QueryBuilder getCurrentQueryAsEsBuilder() { 125 String nxql = getCurrentQuery(); 126 return NxqlQueryConverter.toESQueryBuilder(nxql); 127 } 128 129 @Override 130 protected void pageChanged() { 131 currentPageDocuments = null; 132 currentAggregates = null; 133 super.pageChanged(); 134 } 135 136 @Override 137 public void refresh() { 138 currentPageDocuments = null; 139 currentAggregates = null; 140 super.refresh(); 141 } 142 143 @Override 144 protected CoreSession getCoreSession() { 145 Map<String, Serializable> props = getProperties(); 146 CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY); 147 if (coreSession == null) { 148 throw new NuxeoException("cannot find core session"); 149 } 150 return coreSession; 151 } 152 153 private List<AggregateEsBase<? extends Bucket>> buildAggregates() { 154 ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size()); 155 for (AggregateDefinition def : getAggregateDefinitions()) { 156 ret.add(AggregateFactory.create(def, getSearchDocumentModel())); 157 } 158 return ret; 159 } 160 161 protected boolean searchOnAllRepositories() { 162 String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY); 163 if (value == null) { 164 return false; 165 } 166 return Boolean.valueOf(value); 167 } 168 169 @Override 170 public boolean hasAggregateSupport() { 171 return true; 172 } 173 174 @Override 175 public Map<String, Aggregate<? extends Bucket>> getAggregates() { 176 getCurrentPage(); 177 return currentAggregates; 178 } 179 180 /** 181 * Extends the default implementation to add results of aggregates 182 * 183 * @param eventProps 184 * @since 7.4 185 */ 186 @Override 187 protected void incorporateAggregates(Map<String, Serializable> eventProps) { 188 189 super.incorporateAggregates(eventProps); 190 if (currentAggregates != null) { 191 HashMap<String, Serializable> aggregateMatches = new HashMap<String, Serializable>(); 192 for (String key : currentAggregates.keySet()) { 193 Aggregate<? extends Bucket> ag = currentAggregates.get(key); 194 ArrayList<HashMap<String, Serializable>> buckets = new ArrayList<HashMap<String, Serializable>>(); 195 for (Bucket bucket : ag.getBuckets()) { 196 HashMap<String, Serializable> b = new HashMap<String, Serializable>(); 197 b.put("key", bucket.getKey()); 198 b.put("count", bucket.getDocCount()); 199 buckets.add(b); 200 } 201 aggregateMatches.put(key, buckets); 202 } 203 eventProps.put("aggregatesMatches", aggregateMatches); 204 } 205 } 206}