001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019
020package org.nuxeo.elasticsearch.provider;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027
028import org.apache.commons.lang.StringUtils;
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.elasticsearch.index.query.QueryBuilder;
032import org.nuxeo.ecm.core.api.CoreSession;
033import org.nuxeo.ecm.core.api.DocumentModel;
034import org.nuxeo.ecm.core.api.DocumentModelList;
035import org.nuxeo.ecm.core.api.NuxeoException;
036import org.nuxeo.ecm.core.api.SortInfo;
037import org.nuxeo.ecm.core.query.QueryParseException;
038import org.nuxeo.ecm.platform.query.api.AbstractPageProvider;
039import org.nuxeo.ecm.platform.query.api.Aggregate;
040import org.nuxeo.ecm.platform.query.api.AggregateDefinition;
041import org.nuxeo.ecm.platform.query.api.Bucket;
042import org.nuxeo.ecm.platform.query.api.PageProviderDefinition;
043import org.nuxeo.ecm.platform.query.api.QuickFilter;
044import org.nuxeo.ecm.platform.query.api.WhereClauseDefinition;
045import org.nuxeo.ecm.platform.query.nxql.NXQLQueryBuilder;
046import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
047import org.nuxeo.elasticsearch.aggregate.AggregateFactory;
048import org.nuxeo.elasticsearch.api.ElasticSearchService;
049import org.nuxeo.elasticsearch.api.EsResult;
050import org.nuxeo.elasticsearch.query.NxQueryBuilder;
051import org.nuxeo.elasticsearch.query.PageProviderQueryBuilder;
052import org.nuxeo.runtime.api.Framework;
053
054public class ElasticSearchNativePageProvider extends AbstractPageProvider<DocumentModel> {
055
056    public static final String CORE_SESSION_PROPERTY = "coreSession";
057
058    public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories";
059
060    protected static final Log log = LogFactory.getLog(ElasticSearchNativePageProvider.class);
061
062    private static final long serialVersionUID = 1L;
063
064    protected List<DocumentModel> currentPageDocuments;
065
066    protected Map<String, Aggregate<? extends Bucket>> currentAggregates;
067
068    @Override
069    public Map<String, Aggregate<? extends Bucket>> getAggregates() {
070        getCurrentPage();
071        return currentAggregates;
072    }
073
074    @Override
075    public List<DocumentModel> getCurrentPage() {
076        long t0 = System.currentTimeMillis();
077
078        // use a cache
079        if (currentPageDocuments != null) {
080            return currentPageDocuments;
081        }
082        error = null;
083        errorMessage = null;
084        if (log.isDebugEnabled()) {
085            log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(),
086                    getMinMaxPageSize(), getCurrentPageOffset()));
087        }
088        currentPageDocuments = new ArrayList<DocumentModel>();
089        // Build the ES query
090        QueryBuilder query = makeQueryBuilder();
091        SortInfo[] sortArray = null;
092        if (sortInfos != null) {
093            sortArray = sortInfos.toArray(new SortInfo[sortInfos.size()]);
094        }
095        // Execute the ES query
096        ElasticSearchService ess = Framework.getLocalService(ElasticSearchService.class);
097        try {
098            NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).esQuery(query)
099                                                                         .offset((int) getCurrentPageOffset())
100                                                                         .limit((int) getMinMaxPageSize())
101                                                                         .addSort(sortArray)
102                                                                         .addAggregates(buildAggregates());
103            if (searchOnAllRepositories()) {
104                nxQuery.searchOnAllRepositories();
105            }
106            EsResult ret = ess.queryAndAggregate(nxQuery);
107            DocumentModelList dmList = ret.getDocuments();
108            currentAggregates = new HashMap<>(ret.getAggregates().size());
109            for (Aggregate<Bucket> agg : ret.getAggregates()) {
110                currentAggregates.put(agg.getId(), agg);
111            }
112            setResultsCount(dmList.totalSize());
113            currentPageDocuments = dmList;
114        } catch (QueryParseException e) {
115            error = e;
116            errorMessage = e.getMessage();
117            log.warn(e.getMessage(), e);
118        }
119
120        // send event for statistics !
121        fireSearchEvent(getCoreSession().getPrincipal(), query.toString(), currentPageDocuments,
122                System.currentTimeMillis() - t0);
123
124        return currentPageDocuments;
125    }
126
127    private List<AggregateEsBase<? extends Bucket>> buildAggregates() {
128        ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size());
129        for (AggregateDefinition def : getAggregateDefinitions()) {
130            ret.add(AggregateFactory.create(def, getSearchDocumentModel()));
131        }
132        return ret;
133    }
134
135    @Override
136    public boolean hasAggregateSupport() {
137        return true;
138    }
139
140    protected QueryBuilder makeQueryBuilder() {
141        QueryBuilder ret;
142        PageProviderDefinition def = getDefinition();
143        List<QuickFilter> quickFilters = getQuickFilters();
144        String quickFiltersClause = "";
145
146        if (quickFilters != null && !quickFilters.isEmpty()) {
147            for (QuickFilter quickFilter : quickFilters) {
148                String clause = quickFilter.getClause();
149                if (!quickFiltersClause.isEmpty() && clause != null) {
150                    quickFiltersClause = NXQLQueryBuilder.appendClause(quickFiltersClause, clause);
151                } else {
152                    quickFiltersClause = clause != null ? clause : "";
153                }
154            }
155        }
156
157        WhereClauseDefinition whereClause = def.getWhereClause();
158        if (whereClause == null) {
159
160            String originalPattern = def.getPattern();
161            String pattern = quickFiltersClause.isEmpty() ? originalPattern
162                    : StringUtils.containsIgnoreCase(originalPattern, " WHERE ")
163                    ? NXQLQueryBuilder.appendClause(originalPattern, quickFiltersClause)
164                    : originalPattern + " WHERE " + quickFiltersClause;
165
166            ret = PageProviderQueryBuilder.makeQuery(pattern, getParameters(), def.getQuotePatternParameters(),
167                    def.getEscapePatternParameters(), isNativeQuery());
168        } else {
169
170
171            DocumentModel searchDocumentModel = getSearchDocumentModel();
172            if (searchDocumentModel == null) {
173                throw new NuxeoException(String.format(
174                        "Cannot build query of provider '%s': " + "no search document model is set", getName()));
175            }
176            ret = PageProviderQueryBuilder.makeQuery(searchDocumentModel, whereClause, quickFiltersClause,
177                    getParameters(), isNativeQuery());
178        }
179        return ret;
180    }
181
182    @Override
183    protected void pageChanged() {
184        currentPageDocuments = null;
185        currentAggregates = null;
186        super.pageChanged();
187    }
188
189    @Override
190    public void refresh() {
191        currentPageDocuments = null;
192        currentAggregates = null;
193        super.refresh();
194    }
195
196    protected CoreSession getCoreSession() {
197        Map<String, Serializable> props = getProperties();
198        CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY);
199        if (coreSession == null) {
200            throw new NuxeoException("cannot find core session");
201        }
202        return coreSession;
203    }
204
205    protected boolean searchOnAllRepositories() {
206        String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY);
207        if (value == null) {
208            return false;
209        }
210        return Boolean.valueOf(value);
211    }
212
213    public boolean isNativeQuery() {
214        return true;
215    }
216}