001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     bdelbosc
016 */
017
018package org.nuxeo.elasticsearch.provider;
019
020import java.io.Serializable;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.index.query.QueryBuilder;
029import org.nuxeo.ecm.core.api.CoreSession;
030import org.nuxeo.ecm.core.api.DocumentModel;
031import org.nuxeo.ecm.core.api.DocumentModelList;
032import org.nuxeo.ecm.core.api.NuxeoException;
033import org.nuxeo.ecm.core.query.QueryParseException;
034import org.nuxeo.ecm.platform.query.api.Aggregate;
035import org.nuxeo.ecm.platform.query.api.AggregateDefinition;
036import org.nuxeo.ecm.platform.query.api.Bucket;
037import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
038import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
039import org.nuxeo.elasticsearch.aggregate.AggregateFactory;
040import org.nuxeo.elasticsearch.api.ElasticSearchService;
041import org.nuxeo.elasticsearch.api.EsResult;
042import org.nuxeo.elasticsearch.query.NxQueryBuilder;
043import org.nuxeo.elasticsearch.query.NxqlQueryConverter;
044import org.nuxeo.runtime.api.Framework;
045
046/**
047 * Elasticsearch Page provider that converts the NXQL query build by CoreQueryDocumentPageProvider.
048 *
049 * @since 5.9.3
050 */
051public class ElasticSearchNxqlPageProvider extends CoreQueryDocumentPageProvider {
052
053    public static final String CORE_SESSION_PROPERTY = "coreSession";
054
055    public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories";
056
057    protected static final Log log = LogFactory.getLog(ElasticSearchNxqlPageProvider.class);
058
059    private static final long serialVersionUID = 1L;
060
061    protected List<DocumentModel> currentPageDocuments;
062
063    protected HashMap<String, Aggregate<? extends Bucket>> currentAggregates;
064
065    @Override
066    public List<DocumentModel> getCurrentPage() {
067
068        long t0 = System.currentTimeMillis();
069
070        // use a cache
071        if (currentPageDocuments != null) {
072            return currentPageDocuments;
073        }
074        error = null;
075        errorMessage = null;
076        if (log.isDebugEnabled()) {
077            log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(),
078                    getMinMaxPageSize(), getCurrentPageOffset()));
079        }
080        currentPageDocuments = new ArrayList<DocumentModel>();
081        CoreSession coreSession = getCoreSession();
082        if (query == null) {
083            buildQuery(coreSession);
084        }
085        if (query == null) {
086            throw new NuxeoException(String.format("Cannot perform null query: check provider '%s'", getName()));
087        }
088        // Build and execute the ES query
089        ElasticSearchService ess = Framework.getLocalService(ElasticSearchService.class);
090        try {
091            NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).nxql(query).offset(
092                    (int) getCurrentPageOffset()).limit(getLimit()).addAggregates(buildAggregates());
093            if (searchOnAllRepositories()) {
094                nxQuery.searchOnAllRepositories();
095            }
096            EsResult ret = ess.queryAndAggregate(nxQuery);
097            DocumentModelList dmList = ret.getDocuments();
098            currentAggregates = new HashMap<>(ret.getAggregates().size());
099            for (Aggregate<Bucket> agg : ret.getAggregates()) {
100                currentAggregates.put(agg.getId(), agg);
101            }
102            setResultsCount(dmList.totalSize());
103            currentPageDocuments = dmList;
104        } catch (QueryParseException e) {
105            error = e;
106            errorMessage = e.getMessage();
107            log.warn(e.getMessage(), e);
108        }
109
110        // send event for statistics !
111        fireSearchEvent(getCoreSession().getPrincipal(), query, currentPageDocuments, System.currentTimeMillis() - t0);
112
113        return currentPageDocuments;
114    }
115
116    protected int getLimit() {
117        int ret = (int) getMinMaxPageSize();
118        if (ret == 0) {
119            ret = -1;
120        }
121        return ret;
122    }
123
124    public QueryBuilder getCurrentQueryAsEsBuilder() {
125        String nxql = getCurrentQuery();
126        return NxqlQueryConverter.toESQueryBuilder(nxql);
127    }
128
129    @Override
130    protected void pageChanged() {
131        currentPageDocuments = null;
132        currentAggregates = null;
133        super.pageChanged();
134    }
135
136    @Override
137    public void refresh() {
138        currentPageDocuments = null;
139        currentAggregates = null;
140        super.refresh();
141    }
142
143    @Override
144    protected CoreSession getCoreSession() {
145        Map<String, Serializable> props = getProperties();
146        CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY);
147        if (coreSession == null) {
148            throw new NuxeoException("cannot find core session");
149        }
150        return coreSession;
151    }
152
153    private List<AggregateEsBase<? extends Bucket>> buildAggregates() {
154        ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size());
155        for (AggregateDefinition def : getAggregateDefinitions()) {
156            ret.add(AggregateFactory.create(def, getSearchDocumentModel()));
157        }
158        return ret;
159    }
160
161    protected boolean searchOnAllRepositories() {
162        String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY);
163        if (value == null) {
164            return false;
165        }
166        return Boolean.valueOf(value);
167    }
168
169    @Override
170    public boolean hasAggregateSupport() {
171        return true;
172    }
173
174    @Override
175    public Map<String, Aggregate<? extends Bucket>> getAggregates() {
176        getCurrentPage();
177        return currentAggregates;
178    }
179
180    /**
181     * Extends the default implementation to add results of aggregates
182     *
183     * @param eventProps
184     * @since 7.4
185     */
186    @Override
187    protected void incorporateAggregates(Map<String, Serializable> eventProps) {
188
189        super.incorporateAggregates(eventProps);
190        if (currentAggregates != null) {
191            HashMap<String, Serializable> aggregateMatches = new HashMap<String, Serializable>();
192            for (String key : currentAggregates.keySet()) {
193                Aggregate<? extends Bucket> ag = currentAggregates.get(key);
194                ArrayList<HashMap<String, Serializable>> buckets = new ArrayList<HashMap<String, Serializable>>();
195                for (Bucket bucket : ag.getBuckets()) {
196                    HashMap<String, Serializable> b = new HashMap<String, Serializable>();
197                    b.put("key", bucket.getKey());
198                    b.put("count", bucket.getDocCount());
199                    buckets.add(b);
200                }
201                aggregateMatches.put(key, buckets);
202            }
203            eventProps.put("aggregatesMatches", aggregateMatches);
204        }
205    }
206}