001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * bdelbosc 016 */ 017 018package org.nuxeo.elasticsearch.provider; 019 020import java.io.Serializable; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.index.query.QueryBuilder; 029import org.nuxeo.ecm.core.api.CoreSession; 030import org.nuxeo.ecm.core.api.DocumentModel; 031import org.nuxeo.ecm.core.api.DocumentModelList; 032import org.nuxeo.ecm.core.api.NuxeoException; 033import org.nuxeo.ecm.core.api.SortInfo; 034import org.nuxeo.ecm.core.query.QueryParseException; 035import org.nuxeo.ecm.platform.query.api.AbstractPageProvider; 036import org.nuxeo.ecm.platform.query.api.Aggregate; 037import org.nuxeo.ecm.platform.query.api.AggregateDefinition; 038import org.nuxeo.ecm.platform.query.api.Bucket; 039import org.nuxeo.ecm.platform.query.api.PageProviderDefinition; 040import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 041import org.nuxeo.elasticsearch.aggregate.AggregateFactory; 042import org.nuxeo.elasticsearch.api.ElasticSearchService; 043import org.nuxeo.elasticsearch.api.EsResult; 044import org.nuxeo.elasticsearch.query.NxQueryBuilder; 045import org.nuxeo.elasticsearch.query.PageProviderQueryBuilder; 046import org.nuxeo.runtime.api.Framework; 047 048public class ElasticSearchNativePageProvider extends AbstractPageProvider<DocumentModel> { 049 050 public static final String CORE_SESSION_PROPERTY = "coreSession"; 051 052 public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories"; 053 054 protected static final Log log = LogFactory.getLog(ElasticSearchNativePageProvider.class); 055 056 private static final long serialVersionUID = 1L; 057 058 protected List<DocumentModel> currentPageDocuments; 059 060 protected Map<String, Aggregate<? extends Bucket>> currentAggregates; 061 062 @Override 063 public Map<String, Aggregate<? extends Bucket>> getAggregates() { 064 getCurrentPage(); 065 return currentAggregates; 066 } 067 068 @Override 069 public List<DocumentModel> getCurrentPage() { 070 long t0 = System.currentTimeMillis(); 071 072 // use a cache 073 if (currentPageDocuments != null) { 074 return currentPageDocuments; 075 } 076 error = null; 077 errorMessage = null; 078 if (log.isDebugEnabled()) { 079 log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(), 080 getMinMaxPageSize(), getCurrentPageOffset())); 081 } 082 currentPageDocuments = new ArrayList<DocumentModel>(); 083 // Build the ES query 084 QueryBuilder query = makeQueryBuilder(); 085 SortInfo[] sortArray = null; 086 if (sortInfos != null) { 087 sortArray = sortInfos.toArray(new SortInfo[sortInfos.size()]); 088 } 089 // Execute the ES query 090 ElasticSearchService ess = Framework.getLocalService(ElasticSearchService.class); 091 try { 092 NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).esQuery(query).offset( 093 (int) getCurrentPageOffset()).limit((int) getMinMaxPageSize()).addSort(sortArray).addAggregates( 094 buildAggregates()); 095 if (searchOnAllRepositories()) { 096 nxQuery.searchOnAllRepositories(); 097 } 098 EsResult ret = ess.queryAndAggregate(nxQuery); 099 DocumentModelList dmList = ret.getDocuments(); 100 currentAggregates = new HashMap<>(ret.getAggregates().size()); 101 for (Aggregate<Bucket> agg : ret.getAggregates()) { 102 currentAggregates.put(agg.getId(), agg); 103 } 104 setResultsCount(dmList.totalSize()); 105 currentPageDocuments = dmList; 106 } catch (QueryParseException e) { 107 error = e; 108 errorMessage = e.getMessage(); 109 log.warn(e.getMessage(), e); 110 } 111 112 // send event for statistics ! 113 fireSearchEvent(getCoreSession().getPrincipal(), query.toString(), currentPageDocuments, 114 System.currentTimeMillis() - t0); 115 116 return currentPageDocuments; 117 } 118 119 private List<AggregateEsBase<? extends Bucket>> buildAggregates() { 120 ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size()); 121 for (AggregateDefinition def : getAggregateDefinitions()) { 122 ret.add(AggregateFactory.create(def, getSearchDocumentModel())); 123 } 124 return ret; 125 } 126 127 @Override 128 public boolean hasAggregateSupport() { 129 return true; 130 } 131 132 protected QueryBuilder makeQueryBuilder() { 133 QueryBuilder ret; 134 PageProviderDefinition def = getDefinition(); 135 if (def.getWhereClause() == null) { 136 ret = PageProviderQueryBuilder.makeQuery(def.getPattern(), getParameters(), 137 def.getQuotePatternParameters(), def.getEscapePatternParameters(), isNativeQuery()); 138 } else { 139 DocumentModel searchDocumentModel = getSearchDocumentModel(); 140 if (searchDocumentModel == null) { 141 throw new NuxeoException(String.format("Cannot build query of provider '%s': " 142 + "no search document model is set", getName())); 143 } 144 ret = PageProviderQueryBuilder.makeQuery(searchDocumentModel, def.getWhereClause(), getParameters(), 145 isNativeQuery()); 146 } 147 return ret; 148 } 149 150 @Override 151 protected void pageChanged() { 152 currentPageDocuments = null; 153 currentAggregates = null; 154 super.pageChanged(); 155 } 156 157 @Override 158 public void refresh() { 159 currentPageDocuments = null; 160 currentAggregates = null; 161 super.refresh(); 162 } 163 164 protected CoreSession getCoreSession() { 165 Map<String, Serializable> props = getProperties(); 166 CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY); 167 if (coreSession == null) { 168 throw new NuxeoException("cannot find core session"); 169 } 170 return coreSession; 171 } 172 173 protected boolean searchOnAllRepositories() { 174 String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY); 175 if (value == null) { 176 return false; 177 } 178 return Boolean.valueOf(value); 179 } 180 181 public boolean isNativeQuery() { 182 return true; 183 } 184}