001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     bdelbosc
016 */
017
018package org.nuxeo.elasticsearch.provider;
019
020import java.io.Serializable;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.index.query.QueryBuilder;
029import org.nuxeo.ecm.core.api.CoreSession;
030import org.nuxeo.ecm.core.api.DocumentModel;
031import org.nuxeo.ecm.core.api.DocumentModelList;
032import org.nuxeo.ecm.core.api.NuxeoException;
033import org.nuxeo.ecm.core.api.SortInfo;
034import org.nuxeo.ecm.core.query.QueryParseException;
035import org.nuxeo.ecm.platform.query.api.AbstractPageProvider;
036import org.nuxeo.ecm.platform.query.api.Aggregate;
037import org.nuxeo.ecm.platform.query.api.AggregateDefinition;
038import org.nuxeo.ecm.platform.query.api.Bucket;
039import org.nuxeo.ecm.platform.query.api.PageProviderDefinition;
040import org.nuxeo.elasticsearch.aggregate.AggregateEsBase;
041import org.nuxeo.elasticsearch.aggregate.AggregateFactory;
042import org.nuxeo.elasticsearch.api.ElasticSearchService;
043import org.nuxeo.elasticsearch.api.EsResult;
044import org.nuxeo.elasticsearch.query.NxQueryBuilder;
045import org.nuxeo.elasticsearch.query.PageProviderQueryBuilder;
046import org.nuxeo.runtime.api.Framework;
047
048public class ElasticSearchNativePageProvider extends AbstractPageProvider<DocumentModel> {
049
050    public static final String CORE_SESSION_PROPERTY = "coreSession";
051
052    public static final String SEARCH_ON_ALL_REPOSITORIES_PROPERTY = "searchAllRepositories";
053
054    protected static final Log log = LogFactory.getLog(ElasticSearchNativePageProvider.class);
055
056    private static final long serialVersionUID = 1L;
057
058    protected List<DocumentModel> currentPageDocuments;
059
060    protected Map<String, Aggregate<? extends Bucket>> currentAggregates;
061
062    @Override
063    public Map<String, Aggregate<? extends Bucket>> getAggregates() {
064        getCurrentPage();
065        return currentAggregates;
066    }
067
068    @Override
069    public List<DocumentModel> getCurrentPage() {
070        long t0 = System.currentTimeMillis();
071
072        // use a cache
073        if (currentPageDocuments != null) {
074            return currentPageDocuments;
075        }
076        error = null;
077        errorMessage = null;
078        if (log.isDebugEnabled()) {
079            log.debug(String.format("Perform query for provider '%s': with pageSize=%d, offset=%d", getName(),
080                    getMinMaxPageSize(), getCurrentPageOffset()));
081        }
082        currentPageDocuments = new ArrayList<DocumentModel>();
083        // Build the ES query
084        QueryBuilder query = makeQueryBuilder();
085        SortInfo[] sortArray = null;
086        if (sortInfos != null) {
087            sortArray = sortInfos.toArray(new SortInfo[sortInfos.size()]);
088        }
089        // Execute the ES query
090        ElasticSearchService ess = Framework.getLocalService(ElasticSearchService.class);
091        try {
092            NxQueryBuilder nxQuery = new NxQueryBuilder(getCoreSession()).esQuery(query).offset(
093                    (int) getCurrentPageOffset()).limit((int) getMinMaxPageSize()).addSort(sortArray).addAggregates(
094                    buildAggregates());
095            if (searchOnAllRepositories()) {
096                nxQuery.searchOnAllRepositories();
097            }
098            EsResult ret = ess.queryAndAggregate(nxQuery);
099            DocumentModelList dmList = ret.getDocuments();
100            currentAggregates = new HashMap<>(ret.getAggregates().size());
101            for (Aggregate<Bucket> agg : ret.getAggregates()) {
102                currentAggregates.put(agg.getId(), agg);
103            }
104            setResultsCount(dmList.totalSize());
105            currentPageDocuments = dmList;
106        } catch (QueryParseException e) {
107            error = e;
108            errorMessage = e.getMessage();
109            log.warn(e.getMessage(), e);
110        }
111
112        // send event for statistics !
113        fireSearchEvent(getCoreSession().getPrincipal(), query.toString(), currentPageDocuments,
114                System.currentTimeMillis() - t0);
115
116        return currentPageDocuments;
117    }
118
119    private List<AggregateEsBase<? extends Bucket>> buildAggregates() {
120        ArrayList<AggregateEsBase<? extends Bucket>> ret = new ArrayList<>(getAggregateDefinitions().size());
121        for (AggregateDefinition def : getAggregateDefinitions()) {
122            ret.add(AggregateFactory.create(def, getSearchDocumentModel()));
123        }
124        return ret;
125    }
126
127    @Override
128    public boolean hasAggregateSupport() {
129        return true;
130    }
131
132    protected QueryBuilder makeQueryBuilder() {
133        QueryBuilder ret;
134        PageProviderDefinition def = getDefinition();
135        if (def.getWhereClause() == null) {
136            ret = PageProviderQueryBuilder.makeQuery(def.getPattern(), getParameters(),
137                    def.getQuotePatternParameters(), def.getEscapePatternParameters(), isNativeQuery());
138        } else {
139            DocumentModel searchDocumentModel = getSearchDocumentModel();
140            if (searchDocumentModel == null) {
141                throw new NuxeoException(String.format("Cannot build query of provider '%s': "
142                        + "no search document model is set", getName()));
143            }
144            ret = PageProviderQueryBuilder.makeQuery(searchDocumentModel, def.getWhereClause(), getParameters(),
145                    isNativeQuery());
146        }
147        return ret;
148    }
149
150    @Override
151    protected void pageChanged() {
152        currentPageDocuments = null;
153        currentAggregates = null;
154        super.pageChanged();
155    }
156
157    @Override
158    public void refresh() {
159        currentPageDocuments = null;
160        currentAggregates = null;
161        super.refresh();
162    }
163
164    protected CoreSession getCoreSession() {
165        Map<String, Serializable> props = getProperties();
166        CoreSession coreSession = (CoreSession) props.get(CORE_SESSION_PROPERTY);
167        if (coreSession == null) {
168            throw new NuxeoException("cannot find core session");
169        }
170        return coreSession;
171    }
172
173    protected boolean searchOnAllRepositories() {
174        String value = (String) getProperties().get(SEARCH_ON_ALL_REPOSITORIES_PROPERTY);
175        if (value == null) {
176            return false;
177        }
178        return Boolean.valueOf(value);
179    }
180
181    public boolean isNativeQuery() {
182        return true;
183    }
184}