001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019
020package org.nuxeo.elasticsearch.provider;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.index.query.QueryBuilder;
031import org.nuxeo.ecm.core.api.CoreSession;
032import org.nuxeo.ecm.core.api.DocumentModel;
033import org.nuxeo.ecm.core.api.DocumentModelList;
034import org.nuxeo.ecm.core.api.NuxeoException;
035import org.nuxeo.ecm.core.query.QueryParseException;
036import org.nuxeo.ecm.platform.query.api.Aggregate;
037import org.nuxeo.ecm.platform.query.api.AggregateDefinition;
038import org.nuxeo.ecm.platform.query.api.Bucket;
039import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
040import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
041import org.nuxeo.elasticsearch.aggregate.AggregateFactory;
042import org.nuxeo.elasticsearch.api.ElasticSearchService;
043import org.nuxeo.elasticsearch.api.EsResult;
044import org.nuxeo.elasticsearch.query.NxQueryBuilder;
045import org.nuxeo.elasticsearch.query.NxqlQueryConverter;
046import org.nuxeo.runtime.api.Framework;
047
048/**
049 * Elasticsearch Page provider that converts the NXQL query build by CoreQueryDocumentPageProvider.
050 *
051 * @since 5.9.3
052 */
053public class ElasticSearchNxqlPageProvider extends CoreQueryDocumentPageProvider {
054
055    public static final String CORE_SESSION_PROPERTY = "coreSession";
056
057    public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories";
058
059    protected static final Log log = LogFactory.getLog(ElasticSearchNxqlPageProvider.class);
060
061    private static final long serialVersionUID = 1L;
062
063    protected List<DocumentModel> currentPageDocuments;
064
065    protected HashMap<String, Aggregate<? extends Bucket>> currentAggregates;
066
067    @Override
068    public List<DocumentModel> getCurrentPage() {
069
070        long t0 = System.currentTimeMillis();
071
072        // use a cache
073        if (currentPageDocuments != null) {
074            return currentPageDocuments;
075        }
076        error = null;
077        errorMessage = null;
078        if (log.isDebugEnabled()) {
079            log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(),
080                    getMinMaxPageSize(), getCurrentPageOffset()));
081        }
082        currentPageDocuments = new ArrayList<DocumentModel>();
083        CoreSession coreSession = getCoreSession();
084        if (query == null) {
085            buildQuery(coreSession);
086        }
087        if (query == null) {
088            throw new NuxeoException(String.format("Cannot perform null query: check provider '%s'", getName()));
089        }
090        // Build and execute the ES query
091        ElasticSearchService ess = Framework.getLocalService(ElasticSearchService.class);
092        try {
093            NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).nxql(query).offset(
094                    (int) getCurrentPageOffset()).limit(getLimit()).addAggregates(buildAggregates());
095            if (searchOnAllRepositories()) {
096                nxQuery.searchOnAllRepositories();
097            }
098            EsResult ret = ess.queryAndAggregate(nxQuery);
099            DocumentModelList dmList = ret.getDocuments();
100            currentAggregates = new HashMap<>(ret.getAggregates().size());
101            for (Aggregate<Bucket> agg : ret.getAggregates()) {
102                currentAggregates.put(agg.getId(), agg);
103            }
104            setResultsCount(dmList.totalSize());
105            currentPageDocuments = dmList;
106        } catch (QueryParseException e) {
107            error = e;
108            errorMessage = e.getMessage();
109            log.warn(e.getMessage(), e);
110        }
111
112        // send event for statistics !
113        fireSearchEvent(getCoreSession().getPrincipal(), query, currentPageDocuments, System.currentTimeMillis() - t0);
114
115        return currentPageDocuments;
116    }
117
118    protected int getLimit() {
119        int ret = (int) getMinMaxPageSize();
120        if (ret == 0) {
121            ret = -1;
122        }
123        return ret;
124    }
125
126    public QueryBuilder getCurrentQueryAsEsBuilder() {
127        String nxql = getCurrentQuery();
128        return NxqlQueryConverter.toESQueryBuilder(nxql);
129    }
130
131    @Override
132    protected void pageChanged() {
133        currentPageDocuments = null;
134        currentAggregates = null;
135        super.pageChanged();
136    }
137
138    @Override
139    public void refresh() {
140        currentPageDocuments = null;
141        currentAggregates = null;
142        super.refresh();
143    }
144
145    @Override
146    protected CoreSession getCoreSession() {
147        Map<String, Serializable> props = getProperties();
148        CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY);
149        if (coreSession == null) {
150            throw new NuxeoException("cannot find core session");
151        }
152        return coreSession;
153    }
154
155    private List<AggregateEsBase<? extends Bucket>> buildAggregates() {
156        ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size());
157        for (AggregateDefinition def : getAggregateDefinitions()) {
158            ret.add(AggregateFactory.create(def, getSearchDocumentModel()));
159        }
160        return ret;
161    }
162
163    protected boolean searchOnAllRepositories() {
164        String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY);
165        if (value == null) {
166            return false;
167        }
168        return Boolean.valueOf(value);
169    }
170
171    @Override
172    public boolean hasAggregateSupport() {
173        return true;
174    }
175
176    @Override
177    public Map<String, Aggregate<? extends Bucket>> getAggregates() {
178        getCurrentPage();
179        return currentAggregates;
180    }
181
182    /**
183     * Extends the default implementation to add results of aggregates
184     *
185     * @param eventProps
186     * @since 7.4
187     */
188    @Override
189    protected void incorporateAggregates(Map<String, Serializable> eventProps) {
190
191        super.incorporateAggregates(eventProps);
192        if (currentAggregates != null) {
193            HashMap<String, Serializable> aggregateMatches = new HashMap<String, Serializable>();
194            for (String key : currentAggregates.keySet()) {
195                Aggregate<? extends Bucket> ag = currentAggregates.get(key);
196                ArrayList<HashMap<String, Serializable>> buckets = new ArrayList<HashMap<String, Serializable>>();
197                for (Bucket bucket : ag.getBuckets()) {
198                    HashMap<String, Serializable> b = new HashMap<String, Serializable>();
199                    b.put("key", bucket.getKey());
200                    b.put("count", bucket.getDocCount());
201                    buckets.add(b);
202                }
203                aggregateMatches.put(key, buckets);
204            }
205            eventProps.put("aggregatesMatches", aggregateMatches);
206        }
207    }
208}