001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Mariana Cedica <mcedica@nuxeo.com>
016 *     Antoine Taillefer <ataillefer@nuxeo.com>
017 */
018package org.nuxeo.drive.elasticsearch;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Set;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.elasticsearch.action.search.SearchRequestBuilder;
028import org.elasticsearch.action.search.SearchResponse;
029import org.elasticsearch.action.search.SearchType;
030import org.elasticsearch.client.Client;
031import org.elasticsearch.index.query.AndFilterBuilder;
032import org.elasticsearch.index.query.BoolFilterBuilder;
033import org.elasticsearch.index.query.FilterBuilder;
034import org.elasticsearch.index.query.FilterBuilders;
035import org.elasticsearch.index.query.OrFilterBuilder;
036import org.elasticsearch.index.query.QueryBuilder;
037import org.elasticsearch.index.query.QueryBuilders;
038import org.elasticsearch.index.query.RangeFilterBuilder;
039import org.elasticsearch.index.query.TermsFilterBuilder;
040import org.elasticsearch.search.SearchHit;
041import org.elasticsearch.search.SearchHits;
042import org.elasticsearch.search.sort.SortOrder;
043import org.nuxeo.drive.service.SynchronizationRoots;
044import org.nuxeo.drive.service.impl.AuditChangeFinder;
045import org.nuxeo.ecm.core.api.CoreSession;
046import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
047import org.nuxeo.ecm.platform.audit.api.LogEntry;
048import org.nuxeo.elasticsearch.ElasticSearchConstants;
049import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
050import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
051import org.nuxeo.runtime.api.Framework;
052
053/**
054 * Override the JPA audit based change finder to execute query in ES.
055 * <p>
056 * The structure of the query executed by the {@link AuditChangeFinder} is:
057 *
058 * <pre>
059 * from LogEntry log where log.repositoryId = :repositoryId
060 * 
061 * + AND if ActiveRoots (activeRoots) NOT empty
062 * 
063 * from LogEntry log where log.repositoryId = :repositoryId and (
064 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or
065 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
066 * 
067 * 
068 * if ActiveRoots EMPTY:
069 * 
070 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
071 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
072 * 
073 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
074 * log.repositoryId asc, log.eventDate desc
075 * </pre>
076 *
077 * @since 7.3
078 */
079public class ESAuditChangeFinder extends AuditChangeFinder {
080
081    private static final long serialVersionUID = 1L;
082
083    public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class);
084
085    protected Client esClient = null;
086
087    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
088            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
089
090        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()).setTypes(
091                ElasticSearchConstants.ENTRY_TYPE).setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
092
093        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
094        FilterBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
095                upperBound, integerBounds, limit);
096        builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder));
097
098        builder.addSort("repositoryId", SortOrder.ASC);
099        builder.addSort("eventDate", SortOrder.DESC);
100
101        List<LogEntry> entries = new ArrayList<>();
102        SearchResponse searchResponse = builder.setFrom(0).setSize(limit).execute().actionGet();
103        for (SearchHit hit : searchResponse.getHits()) {
104            try {
105                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
106            } catch (IOException e) {
107                log.error("Error while reading Audit Entry from ES", e);
108            }
109        }
110        return entries;
111    }
112
113    protected FilterBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
114            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
115        AndFilterBuilder filterBuilder = FilterBuilders.andFilter();
116
117        // from LogEntry log where log.repositoryId = :repositoryId
118        FilterBuilder repositoryClauseFilter = FilterBuilders.termFilter("repositoryId", session.getRepositoryName());
119        filterBuilder.add(repositoryClauseFilter);
120
121        if (activeRoots.getPaths().isEmpty()) {
122            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
123            filterBuilder.add(getDriveLogsQueryClause());
124        } else {
125
126            OrFilterBuilder orFilterBuilderIfActiveRoots = FilterBuilders.orFilter();
127
128            // LIST_DOC_EVENTS_IDS_QUERY
129
130            // (log.category = 'eventDocumentCategory' and (log.eventId =
131            // 'documentCreated' or log.eventId = 'documentModified' or
132            // log.eventId = 'documentMoved' or log.eventId =
133            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
134            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’) or log.category =
135            // 'eventLifeCycleCategory' and log.eventId =
136            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
137            String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
138                    "documentRestored", "addedToCollection", "documentProxyPublished" };
139            OrFilterBuilder orEventsFilter = FilterBuilders.orFilter();
140            orEventsFilter.add(getEventsClause("eventDocumentCategory", eventIds, true));
141            orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" },
142                    true));
143            orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
144
145            // ROOT_PATHS log.docPath like :rootPath1
146            if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) {
147                OrFilterBuilder rootsOrCollectionsFilter = FilterBuilders.orFilter();
148                rootsOrCollectionsFilter.add(getCurrentRootsClause(activeRoots.getPaths()));
149                rootsOrCollectionsFilter.add(getCollectionSyncRootClause(collectionSyncRootMemberIds));
150
151                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
152                // COLECTIONS_PATHS)
153                // or (log.category = 'NuxeoDrive' and log.eventId !=
154                // 'rootUnregistered') )
155                orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter, rootsOrCollectionsFilter));
156            } else {
157                orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter,
158                        getCurrentRootsClause(activeRoots.getPaths())));
159            }
160
161            orFilterBuilderIfActiveRoots.add(getDriveLogsQueryClause());
162
163            filterBuilder.add(orFilterBuilderIfActiveRoots);
164        }
165
166        filterBuilder.add(getLogIdBoundsClause(lowerBound, upperBound));
167        return filterBuilder;
168
169    }
170
171    protected RangeFilterBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
172        RangeFilterBuilder rangeFilter = FilterBuilders.rangeFilter("id");
173        rangeFilter.gt(lowerBound);
174        rangeFilter.lte(upperBound);
175        return rangeFilter;
176    }
177
178    protected TermsFilterBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
179        return FilterBuilders.termsFilter("docUUID", collectionSyncRootMemberIds);
180    }
181
182    protected OrFilterBuilder getCurrentRootsClause(Set<String> rootPaths) {
183        OrFilterBuilder orFilterRoots = FilterBuilders.orFilter();
184        for (String rootPath : rootPaths) {
185            orFilterRoots.add(FilterBuilders.prefixFilter("docPath", rootPath));
186        }
187        return orFilterRoots;
188    }
189
190    protected BoolFilterBuilder getDriveLogsQueryClause() {
191        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
192        filterBuilder.must(FilterBuilders.termFilter("category", "NuxeoDrive"));
193        filterBuilder.mustNot(FilterBuilders.termFilter("eventId", "rootUnregistered"));
194        return filterBuilder;
195    }
196
197    protected BoolFilterBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
198        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
199        filterBuilder.must(FilterBuilders.termFilter("category", category));
200        if (eventIds != null && eventIds.length > 0) {
201            if (eventIds.length == 1) {
202                if (shouldMatch) {
203                    filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0]));
204                } else {
205                    filterBuilder.mustNot(FilterBuilders.termFilter("eventId", eventIds[0]));
206                }
207            } else {
208                if (shouldMatch) {
209                    filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds));
210                } else {
211                    filterBuilder.mustNot(FilterBuilders.termsFilter("eventId", eventIds));
212                }
213            }
214        }
215        return filterBuilder;
216    }
217
218    @Override
219    public long getUpperBound() {
220        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()).setTypes(
221                ElasticSearchConstants.ENTRY_TYPE).setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
222        // TODO refactor this to use max clause
223        builder.setQuery(QueryBuilders.matchAllQuery());
224        builder.addSort("id", SortOrder.DESC);
225        builder.setFrom(0);
226        builder.setSize(1);
227        SearchResponse searchResponse = builder.execute().actionGet();
228        List<LogEntry> entries = new ArrayList<>();
229        SearchHits hits = searchResponse.getHits();
230        for (SearchHit hit : hits) {
231            try {
232                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
233            } catch (IOException e) {
234                log.error("Error while reading Audit Entry from ES", e);
235            }
236        }
237        return entries.size() > 0 ? entries.get(0).getId() : -1;
238    }
239
240    @Override
241    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
242            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
243        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
244                upperBound, integerBounds, limit);
245        // Post filter the output to remove (un)registration that are unrelated
246        // to the current user.
247        // TODO move this to the ES query
248        List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>();
249        String principalName = session.getPrincipal().getName();
250        for (LogEntry entry : entries) {
251            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
252            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
253                // ignore event that only impact other users
254                continue;
255            }
256            if (log.isDebugEnabled()) {
257                if (log.isDebugEnabled()) {
258                    log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s",
259                            entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(),
260                            entry.getDocPath()));
261                }
262            }
263            postFilteredEntries.add(entry);
264        }
265        return postFilteredEntries;
266    }
267
268    protected Client getClient() {
269        if (esClient == null) {
270            log.info("Activate Elasticsearch backend for Audit");
271            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
272            esClient = esa.getClient();
273        }
274        return esClient;
275    }
276
277    protected String getESIndexName() {
278        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
279        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
280    }
281}