001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019 020package org.nuxeo.elasticsearch.provider; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.index.query.QueryBuilder; 031import org.nuxeo.ecm.core.api.CoreSession; 032import org.nuxeo.ecm.core.api.DocumentModel; 033import org.nuxeo.ecm.core.api.DocumentModelList; 034import org.nuxeo.ecm.core.api.NuxeoException; 035import org.nuxeo.ecm.core.api.SortInfo; 036import org.nuxeo.ecm.core.query.QueryParseException; 037import org.nuxeo.ecm.platform.query.api.AbstractPageProvider; 038import org.nuxeo.ecm.platform.query.api.Aggregate; 039import org.nuxeo.ecm.platform.query.api.AggregateDefinition; 040import org.nuxeo.ecm.platform.query.api.Bucket; 041import org.nuxeo.ecm.platform.query.api.PageProviderDefinition; 042import org.nuxeo.elasticsearch.aggregate.AggregateEsBase; 043import org.nuxeo.elasticsearch.aggregate.AggregateFactory; 044import org.nuxeo.elasticsearch.api.ElasticSearchService; 045import org.nuxeo.elasticsearch.api.EsResult; 046import org.nuxeo.elasticsearch.query.NxQueryBuilder; 047import org.nuxeo.elasticsearch.query.PageProviderQueryBuilder; 048import org.nuxeo.runtime.api.Framework; 049 050public class ElasticSearchNativePageProvider extends AbstractPageProvider<DocumentModel> { 051 052 public static final String CORE_SESSION_PROPERTY = "coreSession"; 053 054 public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories"; 055 056 protected static final Log log = LogFactory.getLog(ElasticSearchNativePageProvider.class); 057 058 private static final long serialVersionUID = 1L; 059 060 protected List<DocumentModel> currentPageDocuments; 061 062 protected Map<String, Aggregate<? extends Bucket>> currentAggregates; 063 064 @Override 065 public Map<String, Aggregate<? extends Bucket>> getAggregates() { 066 getCurrentPage(); 067 return currentAggregates; 068 } 069 070 @Override 071 public List<DocumentModel> getCurrentPage() { 072 long t0 = System.currentTimeMillis(); 073 074 // use a cache 075 if (currentPageDocuments != null) { 076 return currentPageDocuments; 077 } 078 error = null; 079 errorMessage = null; 080 if (log.isDebugEnabled()) { 081 log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(), 082 getMinMaxPageSize(), getCurrentPageOffset())); 083 } 084 currentPageDocuments = new ArrayList<DocumentModel>(); 085 // Build the ES query 086 QueryBuilder query = makeQueryBuilder(); 087 SortInfo[] sortArray = null; 088 if (sortInfos != null) { 089 sortArray = sortInfos.toArray(new SortInfo[sortInfos.size()]); 090 } 091 // Execute the ES query 092 ElasticSearchService ess = Framework.getLocalService(ElasticSearchService.class); 093 try { 094 NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).esQuery(query).offset( 095 (int) getCurrentPageOffset()).limit((int) getMinMaxPageSize()).addSort(sortArray).addAggregates( 096 buildAggregates()); 097 if (searchOnAllRepositories()) { 098 nxQuery.searchOnAllRepositories(); 099 } 100 EsResult ret = ess.queryAndAggregate(nxQuery); 101 DocumentModelList dmList = ret.getDocuments(); 102 currentAggregates = new HashMap<>(ret.getAggregates().size()); 103 for (Aggregate<Bucket> agg : ret.getAggregates()) { 104 currentAggregates.put(agg.getId(), agg); 105 } 106 setResultsCount(dmList.totalSize()); 107 currentPageDocuments = dmList; 108 } catch (QueryParseException e) { 109 error = e; 110 errorMessage = e.getMessage(); 111 log.warn(e.getMessage(), e); 112 } 113 114 // send event for statistics ! 115 fireSearchEvent(getCoreSession().getPrincipal(), query.toString(), currentPageDocuments, 116 System.currentTimeMillis() - t0); 117 118 return currentPageDocuments; 119 } 120 121 private List<AggregateEsBase<? extends Bucket>> buildAggregates() { 122 ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size()); 123 for (AggregateDefinition def : getAggregateDefinitions()) { 124 ret.add(AggregateFactory.create(def, getSearchDocumentModel())); 125 } 126 return ret; 127 } 128 129 @Override 130 public boolean hasAggregateSupport() { 131 return true; 132 } 133 134 protected QueryBuilder makeQueryBuilder() { 135 QueryBuilder ret; 136 PageProviderDefinition def = getDefinition(); 137 if (def.getWhereClause() == null) { 138 ret = PageProviderQueryBuilder.makeQuery(def.getPattern(), getParameters(), 139 def.getQuotePatternParameters(), def.getEscapePatternParameters(), isNativeQuery()); 140 } else { 141 DocumentModel searchDocumentModel = getSearchDocumentModel(); 142 if (searchDocumentModel == null) { 143 throw new NuxeoException(String.format("Cannot build query of provider '%s': " 144 + "no search document model is set", getName())); 145 } 146 ret = PageProviderQueryBuilder.makeQuery(searchDocumentModel, def.getWhereClause(), getParameters(), 147 isNativeQuery()); 148 } 149 return ret; 150 } 151 152 @Override 153 protected void pageChanged() { 154 currentPageDocuments = null; 155 currentAggregates = null; 156 super.pageChanged(); 157 } 158 159 @Override 160 public void refresh() { 161 currentPageDocuments = null; 162 currentAggregates = null; 163 super.refresh(); 164 } 165 166 protected CoreSession getCoreSession() { 167 Map<String, Serializable> props = getProperties(); 168 CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY); 169 if (coreSession == null) { 170 throw new NuxeoException("cannot find core session"); 171 } 172 return coreSession; 173 } 174 175 protected boolean searchOnAllRepositories() { 176 String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY); 177 if (value == null) { 178 return false; 179 } 180 return Boolean.valueOf(value); 181 } 182 183 public boolean isNativeQuery() { 184 return true; 185 } 186}