001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019
020package org.nuxeo.elasticsearch.provider;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027
028import org.apache.commons.lang.StringUtils;
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.elasticsearch.index.query.QueryBuilder;
032import org.nuxeo.ecm.core.api.CoreSession;
033import org.nuxeo.ecm.core.api.DocumentModel;
034import org.nuxeo.ecm.core.api.DocumentModelList;
035import org.nuxeo.ecm.core.api.NuxeoException;
036import org.nuxeo.ecm.core.api.SortInfo;
037import org.nuxeo.ecm.core.query.QueryParseException;
038import org.nuxeo.ecm.platform.query.api.AbstractPageProvider;
039import org.nuxeo.ecm.platform.query.api.Aggregate;
040import org.nuxeo.ecm.platform.query.api.AggregateDefinition;
041import org.nuxeo.ecm.platform.query.api.Bucket;
042import org.nuxeo.ecm.platform.query.api.PageProviderDefinition;
043import org.nuxeo.ecm.platform.query.api.QuickFilter;
044import org.nuxeo.ecm.platform.query.api.WhereClauseDefinition;
045import org.nuxeo.ecm.platform.query.nxql.NXQLQueryBuilder;
046import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
047import org.nuxeo.elasticsearch.aggregate.AggregateFactory;
048import org.nuxeo.elasticsearch.api.ElasticSearchService;
049import org.nuxeo.elasticsearch.api.EsResult;
050import org.nuxeo.elasticsearch.query.NxQueryBuilder;
051import org.nuxeo.elasticsearch.query.PageProviderQueryBuilder;
052import org.nuxeo.runtime.api.Framework;
053
054public class ElasticSearchNativePageProvider extends AbstractPageProvider<DocumentModel> {
055
056    public static final String CORE_SESSION_PROPERTY = "coreSession";
057
058    public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories";
059
060    protected static final Log log = LogFactory.getLog(ElasticSearchNativePageProvider.class);
061
062    private static final long serialVersionUID = 1L;
063
064    protected List<DocumentModel> currentPageDocuments;
065
066    protected Map<String, Aggregate<? extends Bucket>> currentAggregates;
067
068    @Override
069    public Map<String, Aggregate<? extends Bucket>> getAggregates() {
070        getCurrentPage();
071        return currentAggregates;
072    }
073
074    @Override
075    public List<DocumentModel> getCurrentPage() {
076        long t0 = System.currentTimeMillis();
077
078        // use a cache
079        if (currentPageDocuments != null) {
080            return currentPageDocuments;
081        }
082        error = null;
083        errorMessage = null;
084        if (log.isDebugEnabled()) {
085            log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(),
086                    getMinMaxPageSize(), getCurrentPageOffset()));
087        }
088        currentPageDocuments = new ArrayList<>();
089        // Build the ES query
090        QueryBuilder query = makeQueryBuilder();
091        SortInfo[] sortArray = null;
092        if (sortInfos != null) {
093            sortArray = sortInfos.toArray(new SortInfo[sortInfos.size()]);
094        }
095        // Execute the ES query
096        ElasticSearchService ess = Framework.getService(ElasticSearchService.class);
097        try {
098            NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).esQuery(query)
099                                                                         .offset((int) getCurrentPageOffset())
100                                                                         .limit((int) getMinMaxPageSize())
101                                                                         .addSort(sortArray)
102                                                                         .addAggregates(buildAggregates());
103            if (searchOnAllRepositories()) {
104                nxQuery.searchOnAllRepositories();
105            }
106
107            List<String> highlightFields = getHighlights();
108            if (highlightFields != null && !highlightFields.isEmpty()) {
109                nxQuery.highlight(highlightFields);
110            }
111
112            EsResult ret = ess.queryAndAggregate(nxQuery);
113            DocumentModelList dmList = ret.getDocuments();
114            currentAggregates = new HashMap<>(ret.getAggregates().size());
115            for (Aggregate<Bucket> agg : ret.getAggregates()) {
116                currentAggregates.put(agg.getId(), agg);
117            }
118            setResultsCount(dmList.totalSize());
119            currentPageDocuments = dmList;
120        } catch (QueryParseException e) {
121            error = e;
122            errorMessage = e.getMessage();
123            log.warn(e.getMessage(), e);
124        }
125
126        // send event for statistics !
127        fireSearchEvent(getCoreSession().getPrincipal(), query.toString(), currentPageDocuments,
128                System.currentTimeMillis() - t0);
129
130        return currentPageDocuments;
131    }
132
133    private List<AggregateEsBase<? extends Bucket>> buildAggregates() {
134        ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size());
135        for (AggregateDefinition def : getAggregateDefinitions()) {
136            ret.add(AggregateFactory.create(def, getSearchDocumentModel()));
137        }
138        return ret;
139    }
140
141    @Override
142    public boolean hasAggregateSupport() {
143        return true;
144    }
145
146    protected QueryBuilder makeQueryBuilder() {
147        QueryBuilder ret;
148        PageProviderDefinition def = getDefinition();
149        List<QuickFilter> quickFilters = getQuickFilters();
150        String quickFiltersClause = "";
151
152        if (quickFilters != null && !quickFilters.isEmpty()) {
153            for (QuickFilter quickFilter : quickFilters) {
154                String clause = quickFilter.getClause();
155                if (!quickFiltersClause.isEmpty() && clause != null) {
156                    quickFiltersClause = NXQLQueryBuilder.appendClause(quickFiltersClause, clause);
157                } else {
158                    quickFiltersClause = clause != null ? clause : "";
159                }
160            }
161        }
162
163        WhereClauseDefinition whereClause = def.getWhereClause();
164        if (whereClause == null) {
165
166            String originalPattern = def.getPattern();
167            String pattern = quickFiltersClause.isEmpty() ? originalPattern
168                    : StringUtils.containsIgnoreCase(originalPattern, " WHERE ")
169                            ? NXQLQueryBuilder.appendClause(originalPattern, quickFiltersClause)
170                            : originalPattern + " WHERE " + quickFiltersClause;
171
172            ret = PageProviderQueryBuilder.makeQuery(pattern, getParameters(), def.getQuotePatternParameters(),
173                    def.getEscapePatternParameters(), isNativeQuery());
174        } else {
175
176            DocumentModel searchDocumentModel = getSearchDocumentModel();
177            if (searchDocumentModel == null) {
178                throw new NuxeoException(String.format(
179                        "Cannot build query of provider '%s': " + "no search document model is set", getName()));
180            }
181            ret = PageProviderQueryBuilder.makeQuery(searchDocumentModel, whereClause, quickFiltersClause,
182                    getParameters(), isNativeQuery());
183        }
184        return ret;
185    }
186
187    @Override
188    protected void pageChanged() {
189        currentPageDocuments = null;
190        currentAggregates = null;
191        super.pageChanged();
192    }
193
194    @Override
195    public void refresh() {
196        currentPageDocuments = null;
197        currentAggregates = null;
198        super.refresh();
199    }
200
201    protected CoreSession getCoreSession() {
202        Map<String, Serializable> props = getProperties();
203        CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY);
204        if (coreSession == null) {
205            throw new NuxeoException("cannot find core session");
206        }
207        return coreSession;
208    }
209
210    protected boolean searchOnAllRepositories() {
211        String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY);
212        if (value == null) {
213            return false;
214        }
215        return Boolean.valueOf(value);
216    }
217
218    public boolean isNativeQuery() {
219        return true;
220    }
221}