001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mariana Cedica <mcedica@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020package org.nuxeo.drive.elasticsearch;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Set;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.elasticsearch.action.search.SearchRequestBuilder;
030import org.elasticsearch.action.search.SearchResponse;
031import org.elasticsearch.action.search.SearchType;
032import org.elasticsearch.client.Client;
033import org.elasticsearch.index.query.AndFilterBuilder;
034import org.elasticsearch.index.query.BoolFilterBuilder;
035import org.elasticsearch.index.query.FilterBuilder;
036import org.elasticsearch.index.query.FilterBuilders;
037import org.elasticsearch.index.query.OrFilterBuilder;
038import org.elasticsearch.index.query.QueryBuilder;
039import org.elasticsearch.index.query.QueryBuilders;
040import org.elasticsearch.index.query.RangeFilterBuilder;
041import org.elasticsearch.index.query.TermsFilterBuilder;
042import org.elasticsearch.search.SearchHit;
043import org.elasticsearch.search.SearchHits;
044import org.elasticsearch.search.sort.SortOrder;
045import org.nuxeo.drive.service.SynchronizationRoots;
046import org.nuxeo.drive.service.impl.AuditChangeFinder;
047import org.nuxeo.ecm.core.api.CoreSession;
048import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
049import org.nuxeo.ecm.platform.audit.api.LogEntry;
050import org.nuxeo.elasticsearch.ElasticSearchConstants;
051import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
052import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
053import org.nuxeo.runtime.api.Framework;
054
055/**
056 * Override the JPA audit based change finder to execute query in ES.
057 * <p>
058 * The structure of the query executed by the {@link AuditChangeFinder} is:
059 *
060 * <pre>
061 * from LogEntry log where log.repositoryId = :repositoryId
062 * 
063 * + AND if ActiveRoots (activeRoots) NOT empty
064 * 
065 * from LogEntry log where log.repositoryId = :repositoryId and (
066 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or
067 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
068 * 
069 * 
070 * if ActiveRoots EMPTY:
071 * 
072 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
073 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
074 * 
075 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
076 * log.repositoryId asc, log.eventDate desc
077 * </pre>
078 *
079 * @since 7.3
080 */
081public class ESAuditChangeFinder extends AuditChangeFinder {
082
083    private static final long serialVersionUID = 1L;
084
085    public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class);
086
087    protected Client esClient = null;
088
089    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
090            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
091
092        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
093                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
094                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
095
096        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
097        FilterBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
098                upperBound, integerBounds, limit);
099        builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder));
100
101        builder.addSort("repositoryId", SortOrder.ASC);
102        builder.addSort("eventDate", SortOrder.DESC);
103
104        List<LogEntry> entries = new ArrayList<>();
105        SearchResponse searchResponse = builder.setFrom(0).setSize(limit).execute().actionGet();
106        for (SearchHit hit : searchResponse.getHits()) {
107            try {
108                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
109            } catch (IOException e) {
110                log.error("Error while reading Audit Entry from ES", e);
111            }
112        }
113        return entries;
114    }
115
116    protected FilterBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
117            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
118        AndFilterBuilder filterBuilder = FilterBuilders.andFilter();
119
120        // from LogEntry log where log.repositoryId = :repositoryId
121        FilterBuilder repositoryClauseFilter = FilterBuilders.termFilter("repositoryId", session.getRepositoryName());
122        filterBuilder.add(repositoryClauseFilter);
123
124        if (activeRoots.getPaths().isEmpty()) {
125            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
126            filterBuilder.add(getDriveLogsQueryClause());
127        } else {
128
129            OrFilterBuilder orFilterBuilderIfActiveRoots = FilterBuilders.orFilter();
130
131            // LIST_DOC_EVENTS_IDS_QUERY
132
133            // (log.category = 'eventDocumentCategory' and (log.eventId =
134            // 'documentCreated' or log.eventId = 'documentModified' or
135            // log.eventId = 'documentMoved' or log.eventId =
136            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
137            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId =
138            // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category =
139            // 'eventLifeCycleCategory' and log.eventId =
140            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
141            String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
142                    "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked",
143                    "documentUnlocked" };
144            OrFilterBuilder orEventsFilter = FilterBuilders.orFilter();
145            orEventsFilter.add(getEventsClause("eventDocumentCategory", eventIds, true));
146            orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" },
147                    true));
148            orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
149
150            // ROOT_PATHS log.docPath like :rootPath1
151            if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) {
152                OrFilterBuilder rootsOrCollectionsFilter = FilterBuilders.orFilter();
153                rootsOrCollectionsFilter.add(getCurrentRootsClause(activeRoots.getPaths()));
154                rootsOrCollectionsFilter.add(getCollectionSyncRootClause(collectionSyncRootMemberIds));
155
156                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
157                // COLECTIONS_PATHS)
158                // or (log.category = 'NuxeoDrive' and log.eventId !=
159                // 'rootUnregistered') )
160                orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter, rootsOrCollectionsFilter));
161            } else {
162                orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter,
163                        getCurrentRootsClause(activeRoots.getPaths())));
164            }
165
166            orFilterBuilderIfActiveRoots.add(getDriveLogsQueryClause());
167
168            filterBuilder.add(orFilterBuilderIfActiveRoots);
169        }
170
171        filterBuilder.add(getLogIdBoundsClause(lowerBound, upperBound));
172        return filterBuilder;
173
174    }
175
176    protected RangeFilterBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
177        RangeFilterBuilder rangeFilter = FilterBuilders.rangeFilter("id");
178        rangeFilter.gt(lowerBound);
179        rangeFilter.lte(upperBound);
180        return rangeFilter;
181    }
182
183    protected TermsFilterBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
184        return FilterBuilders.termsFilter("docUUID", collectionSyncRootMemberIds);
185    }
186
187    protected OrFilterBuilder getCurrentRootsClause(Set<String> rootPaths) {
188        OrFilterBuilder orFilterRoots = FilterBuilders.orFilter();
189        for (String rootPath : rootPaths) {
190            orFilterRoots.add(FilterBuilders.prefixFilter("docPath", rootPath));
191        }
192        return orFilterRoots;
193    }
194
195    protected BoolFilterBuilder getDriveLogsQueryClause() {
196        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
197        filterBuilder.must(FilterBuilders.termFilter("category", "NuxeoDrive"));
198        filterBuilder.mustNot(FilterBuilders.termFilter("eventId", "rootUnregistered"));
199        return filterBuilder;
200    }
201
202    protected BoolFilterBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
203        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
204        filterBuilder.must(FilterBuilders.termFilter("category", category));
205        if (eventIds != null && eventIds.length > 0) {
206            if (eventIds.length == 1) {
207                if (shouldMatch) {
208                    filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0]));
209                } else {
210                    filterBuilder.mustNot(FilterBuilders.termFilter("eventId", eventIds[0]));
211                }
212            } else {
213                if (shouldMatch) {
214                    filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds));
215                } else {
216                    filterBuilder.mustNot(FilterBuilders.termsFilter("eventId", eventIds));
217                }
218            }
219        }
220        return filterBuilder;
221    }
222
223    @Override
224    public long getUpperBound() {
225        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
226                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
227                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
228        // TODO refactor this to use max clause
229        builder.setQuery(QueryBuilders.matchAllQuery());
230        builder.addSort("id", SortOrder.DESC);
231        builder.setFrom(0);
232        builder.setSize(1);
233        SearchResponse searchResponse = builder.execute().actionGet();
234        List<LogEntry> entries = new ArrayList<>();
235        SearchHits hits = searchResponse.getHits();
236        for (SearchHit hit : hits) {
237            try {
238                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
239            } catch (IOException e) {
240                log.error("Error while reading Audit Entry from ES", e);
241            }
242        }
243        return entries.size() > 0 ? entries.get(0).getId() : -1;
244    }
245
246    @Override
247    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
248            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
249        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
250                upperBound, integerBounds, limit);
251        // Post filter the output to remove (un)registration that are unrelated
252        // to the current user.
253        // TODO move this to the ES query
254        List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>();
255        String principalName = session.getPrincipal().getName();
256        for (LogEntry entry : entries) {
257            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
258            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
259                // ignore event that only impact other users
260                continue;
261            }
262            if (log.isDebugEnabled()) {
263                if (log.isDebugEnabled()) {
264                    log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s",
265                            entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(),
266                            entry.getDocPath()));
267                }
268            }
269            postFilteredEntries.add(entry);
270        }
271        return postFilteredEntries;
272    }
273
274    protected Client getClient() {
275        if (esClient == null) {
276            log.info("Activate Elasticsearch backend for Audit");
277            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
278            esClient = esa.getClient();
279        }
280        return esClient;
281    }
282
283    protected String getESIndexName() {
284        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
285        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
286    }
287}