001/*
002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019
020package org.nuxeo.elasticsearch.provider;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027
028import org.apache.commons.lang3.StringUtils;
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.elasticsearch.index.query.QueryBuilder;
032import org.nuxeo.ecm.core.api.CoreSession;
033import org.nuxeo.ecm.core.api.DocumentModel;
034import org.nuxeo.ecm.core.api.DocumentModelList;
035import org.nuxeo.ecm.core.api.NuxeoException;
036import org.nuxeo.ecm.core.api.SortInfo;
037import org.nuxeo.ecm.core.query.QueryParseException;
038import org.nuxeo.ecm.platform.query.api.AbstractPageProvider;
039import org.nuxeo.ecm.platform.query.api.Aggregate;
040import org.nuxeo.ecm.platform.query.api.AggregateDefinition;
041import org.nuxeo.ecm.platform.query.api.Bucket;
042import org.nuxeo.ecm.platform.query.api.PageProviderDefinition;
043import org.nuxeo.ecm.platform.query.api.QuickFilter;
044import org.nuxeo.ecm.platform.query.api.WhereClauseDefinition;
045import org.nuxeo.ecm.platform.query.nxql.NXQLQueryBuilder;
046import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
047import org.nuxeo.elasticsearch.aggregate.AggregateFactory;
048import org.nuxeo.elasticsearch.api.ElasticSearchService;
049import org.nuxeo.elasticsearch.api.EsResult;
050import org.nuxeo.elasticsearch.query.NxQueryBuilder;
051import org.nuxeo.elasticsearch.query.PageProviderQueryBuilder;
052import org.nuxeo.runtime.api.Framework;
053
054public class ElasticSearchNativePageProvider extends AbstractPageProvider<DocumentModel> {
055
056    public static final String CORE_SESSION_PROPERTY = "coreSession";
057
058    public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories";
059
060    protected static final Log log = LogFactory.getLog(ElasticSearchNativePageProvider.class);
061
062    private static final long serialVersionUID = 1L;
063
064    protected List<DocumentModel> currentPageDocuments;
065
066    protected Map<String, Aggregate<? extends Bucket>> currentAggregates;
067
068    @Override
069    public Map<String, Aggregate<? extends Bucket>> getAggregates() {
070        getCurrentPage();
071        return currentAggregates;
072    }
073
074    @Override
075    public List<DocumentModel> getCurrentPage() {
076        long t0 = System.currentTimeMillis();
077
078        // use a cache
079        if (currentPageDocuments != null) {
080            return currentPageDocuments;
081        }
082        error = null;
083        errorMessage = null;
084        if (log.isDebugEnabled()) {
085            log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(),
086                    getMinMaxPageSize(), getCurrentPageOffset()));
087        }
088        currentPageDocuments = new ArrayList<>();
089        // Build the ES query
090        QueryBuilder query = makeQueryBuilder();
091        SortInfo[] sortArray = null;
092        if (sortInfos != null) {
093            sortArray = sortInfos.toArray(new SortInfo[sortInfos.size()]);
094        }
095        // Execute the ES query
096        ElasticSearchService ess = Framework.getService(ElasticSearchService.class);
097        try {
098            NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).esQuery(query)
099                                                                         .offset((int) getCurrentPageOffset())
100                                                                         .limit((int) getMinMaxPageSize())
101                                                                         .addSort(sortArray)
102                                                                         .addAggregates(buildAggregates());
103            if (searchOnAllRepositories()) {
104                nxQuery.searchOnAllRepositories();
105            }
106
107            List<String> highlightFields = getHighlights();
108            if (highlightFields != null && !highlightFields.isEmpty()) {
109                nxQuery.highlight(highlightFields);
110            }
111
112            EsResult ret = ess.queryAndAggregate(nxQuery);
113            DocumentModelList dmList = ret.getDocuments();
114            currentAggregates = new HashMap<>(ret.getAggregates().size());
115            for (Aggregate<Bucket> agg : ret.getAggregates()) {
116                currentAggregates.put(agg.getId(), agg);
117            }
118            setResultsCount(dmList.totalSize());
119            currentPageDocuments = dmList;
120        } catch (QueryParseException e) {
121            error = e;
122            errorMessage = e.getMessage();
123            log.warn(e.getMessage(), e);
124        }
125
126        // send event for statistics !
127        fireSearchEvent(getCoreSession().getPrincipal(), query.toString(), currentPageDocuments,
128                System.currentTimeMillis() - t0);
129
130        return currentPageDocuments;
131    }
132
133    private List<AggregateEsBase<? extends Bucket>> buildAggregates() {
134        ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size());
135        boolean skip = isSkipAggregates();
136        for (AggregateDefinition def : getAggregateDefinitions()) {
137            AggregateEsBase<? extends Bucket> agg = AggregateFactory.create(def, getSearchDocumentModel());
138            if (!skip || !agg.getSelection().isEmpty()) {
139                // if we want to skip aggregates but one is selected, it has to be computed to filter the result set
140                ret.add(AggregateFactory.create(def, getSearchDocumentModel()));
141            }
142        }
143        return ret;
144    }
145
146    @Override
147    public boolean hasAggregateSupport() {
148        return true;
149    }
150
151    protected QueryBuilder makeQueryBuilder() {
152        QueryBuilder ret;
153        PageProviderDefinition def = getDefinition();
154        List<QuickFilter> quickFilters = getQuickFilters();
155        String quickFiltersClause = "";
156
157        if (quickFilters != null && !quickFilters.isEmpty()) {
158            for (QuickFilter quickFilter : quickFilters) {
159                String clause = quickFilter.getClause();
160                if (!quickFiltersClause.isEmpty() && clause != null) {
161                    quickFiltersClause = NXQLQueryBuilder.appendClause(quickFiltersClause, clause);
162                } else {
163                    quickFiltersClause = clause != null ? clause : "";
164                }
165            }
166        }
167
168        WhereClauseDefinition whereClause = def.getWhereClause();
169        if (whereClause == null) {
170
171            String originalPattern = def.getPattern();
172            String pattern = quickFiltersClause.isEmpty() ? originalPattern
173                    : StringUtils.containsIgnoreCase(originalPattern, " WHERE ")
174                            ? NXQLQueryBuilder.appendClause(originalPattern, quickFiltersClause)
175                            : originalPattern + " WHERE " + quickFiltersClause;
176
177            ret = PageProviderQueryBuilder.makeQuery(pattern, getParameters(), def.getQuotePatternParameters(),
178                    def.getEscapePatternParameters(), isNativeQuery());
179        } else {
180
181            DocumentModel searchDocumentModel = getSearchDocumentModel();
182            if (searchDocumentModel == null) {
183                throw new NuxeoException(String.format(
184                        "Cannot build query of provider '%s': " + "no search document model is set", getName()));
185            }
186            ret = PageProviderQueryBuilder.makeQuery(searchDocumentModel, whereClause, quickFiltersClause,
187                    getParameters(), isNativeQuery());
188        }
189        return ret;
190    }
191
192    @Override
193    protected void pageChanged() {
194        currentPageDocuments = null;
195        currentAggregates = null;
196        super.pageChanged();
197    }
198
199    @Override
200    public void refresh() {
201        currentPageDocuments = null;
202        currentAggregates = null;
203        super.refresh();
204    }
205
206    protected CoreSession getCoreSession() {
207        Map<String, Serializable> props = getProperties();
208        CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY);
209        if (coreSession == null) {
210            throw new NuxeoException("cannot find core session");
211        }
212        return coreSession;
213    }
214
215    protected boolean searchOnAllRepositories() {
216        String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY);
217        if (value == null) {
218            return false;
219        }
220        return Boolean.parseBoolean(value);
221    }
222
223    public boolean isNativeQuery() {
224        return true;
225    }
226}