001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mariana Cedica <mcedica@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020package org.nuxeo.drive.elasticsearch;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Set;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.SearchRequestBuilder;
031import org.elasticsearch.action.search.SearchResponse;
032import org.elasticsearch.action.search.SearchType;
033import org.elasticsearch.client.Client;
034import org.elasticsearch.index.query.AndFilterBuilder;
035import org.elasticsearch.index.query.BoolFilterBuilder;
036import org.elasticsearch.index.query.FilterBuilder;
037import org.elasticsearch.index.query.FilterBuilders;
038import org.elasticsearch.index.query.OrFilterBuilder;
039import org.elasticsearch.index.query.QueryBuilder;
040import org.elasticsearch.index.query.QueryBuilders;
041import org.elasticsearch.index.query.RangeFilterBuilder;
042import org.elasticsearch.index.query.TermsFilterBuilder;
043import org.elasticsearch.search.SearchHit;
044import org.elasticsearch.search.SearchHits;
045import org.elasticsearch.search.sort.SortOrder;
046import org.nuxeo.drive.service.SynchronizationRoots;
047import org.nuxeo.drive.service.impl.AuditChangeFinder;
048import org.nuxeo.ecm.core.api.CoreSession;
049import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
050import org.nuxeo.ecm.platform.audit.api.LogEntry;
051import org.nuxeo.elasticsearch.ElasticSearchConstants;
052import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
053import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
054import org.nuxeo.runtime.api.Framework;
055
056/**
057 * Override the JPA audit based change finder to execute query in ES.
058 * <p>
059 * The structure of the query executed by the {@link AuditChangeFinder} is:
060 *
061 * <pre>
062 * from LogEntry log where log.repositoryId = :repositoryId
063 * 
064 * + AND if ActiveRoots (activeRoots) NOT empty
065 * 
066 * from LogEntry log where log.repositoryId = :repositoryId and (
067 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or
068 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
069 * 
070 * 
071 * if ActiveRoots EMPTY:
072 * 
073 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
074 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
075 * 
076 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
077 * log.repositoryId asc, log.eventDate desc
078 * </pre>
079 *
080 * @since 7.3
081 */
082public class ESAuditChangeFinder extends AuditChangeFinder {
083
084    private static final long serialVersionUID = 1L;
085
086    public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class);
087
088    protected Client esClient = null;
089
090    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
091            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
092
093        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
094                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
095                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
096
097        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
098        FilterBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
099                upperBound, integerBounds, limit);
100        builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder));
101
102        builder.addSort("repositoryId", SortOrder.ASC);
103        builder.addSort("eventDate", SortOrder.DESC);
104
105        List<LogEntry> entries = new ArrayList<>();
106        SearchResponse searchResponse = builder.setSize(limit).execute().actionGet();
107        for (SearchHit hit : searchResponse.getHits()) {
108            try {
109                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
110            } catch (IOException e) {
111                log.error("Error while reading Audit Entry from ES", e);
112            }
113        }
114        return entries;
115    }
116
117    protected FilterBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
118            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
119        AndFilterBuilder filterBuilder = FilterBuilders.andFilter();
120
121        // from LogEntry log where log.repositoryId = :repositoryId
122        FilterBuilder repositoryClauseFilter = FilterBuilders.termFilter("repositoryId", session.getRepositoryName());
123        filterBuilder.add(repositoryClauseFilter);
124
125        if (activeRoots.getPaths().isEmpty()) {
126            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
127            filterBuilder.add(getDriveLogsQueryClause());
128        } else {
129
130            OrFilterBuilder orFilterBuilderIfActiveRoots = FilterBuilders.orFilter();
131
132            // LIST_DOC_EVENTS_IDS_QUERY
133
134            // (log.category = 'eventDocumentCategory' and (log.eventId =
135            // 'documentCreated' or log.eventId = 'documentModified' or
136            // log.eventId = 'documentMoved' or log.eventId =
137            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
138            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId =
139            // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category =
140            // 'eventLifeCycleCategory' and log.eventId =
141            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
142            String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
143                    "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked",
144                    "documentUnlocked" };
145            OrFilterBuilder orEventsFilter = FilterBuilders.orFilter();
146            orEventsFilter.add(getEventsClause("eventDocumentCategory", eventIds, true));
147            orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" },
148                    true));
149            orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
150
151            // ROOT_PATHS log.docPath like :rootPath1
152            if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) {
153                OrFilterBuilder rootsOrCollectionsFilter = FilterBuilders.orFilter();
154                rootsOrCollectionsFilter.add(getCurrentRootsClause(activeRoots.getPaths()));
155                rootsOrCollectionsFilter.add(getCollectionSyncRootClause(collectionSyncRootMemberIds));
156
157                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
158                // COLECTIONS_PATHS)
159                // or (log.category = 'NuxeoDrive' and log.eventId !=
160                // 'rootUnregistered') )
161                orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter, rootsOrCollectionsFilter));
162            } else {
163                orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter,
164                        getCurrentRootsClause(activeRoots.getPaths())));
165            }
166
167            orFilterBuilderIfActiveRoots.add(getDriveLogsQueryClause());
168
169            filterBuilder.add(orFilterBuilderIfActiveRoots);
170        }
171
172        filterBuilder.add(getLogIdBoundsClause(lowerBound, upperBound));
173        return filterBuilder;
174
175    }
176
177    protected RangeFilterBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
178        RangeFilterBuilder rangeFilter = FilterBuilders.rangeFilter("id");
179        rangeFilter.gt(lowerBound);
180        rangeFilter.lte(upperBound);
181        return rangeFilter;
182    }
183
184    protected TermsFilterBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
185        return FilterBuilders.termsFilter("docUUID", collectionSyncRootMemberIds);
186    }
187
188    protected OrFilterBuilder getCurrentRootsClause(Set<String> rootPaths) {
189        OrFilterBuilder orFilterRoots = FilterBuilders.orFilter();
190        for (String rootPath : rootPaths) {
191            orFilterRoots.add(FilterBuilders.prefixFilter("docPath", rootPath));
192        }
193        return orFilterRoots;
194    }
195
196    protected BoolFilterBuilder getDriveLogsQueryClause() {
197        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
198        filterBuilder.must(FilterBuilders.termFilter("category", "NuxeoDrive"));
199        filterBuilder.mustNot(FilterBuilders.termFilter("eventId", "rootUnregistered"));
200        return filterBuilder;
201    }
202
203    protected BoolFilterBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
204        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
205        filterBuilder.must(FilterBuilders.termFilter("category", category));
206        if (eventIds != null && eventIds.length > 0) {
207            if (eventIds.length == 1) {
208                if (shouldMatch) {
209                    filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0]));
210                } else {
211                    filterBuilder.mustNot(FilterBuilders.termFilter("eventId", eventIds[0]));
212                }
213            } else {
214                if (shouldMatch) {
215                    filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds));
216                } else {
217                    filterBuilder.mustNot(FilterBuilders.termsFilter("eventId", eventIds));
218                }
219            }
220        }
221        return filterBuilder;
222    }
223
224    @Override
225    public long getUpperBound() {
226        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
227                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
228                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
229        // TODO refactor this to use max clause
230        builder.setQuery(QueryBuilders.matchAllQuery());
231        builder.addSort("id", SortOrder.DESC);
232        builder.setSize(1);
233        SearchResponse searchResponse = builder.execute().actionGet();
234        List<LogEntry> entries = new ArrayList<>();
235        SearchHits hits = searchResponse.getHits();
236        for (SearchHit hit : hits) {
237            try {
238                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
239            } catch (IOException e) {
240                log.error("Error while reading Audit Entry from ES", e);
241            }
242        }
243        return entries.size() > 0 ? entries.get(0).getId() : -1;
244    }
245
246    /**
247     * Returns the last available log id in the audit index considering events older than the last clustering
248     * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the
249     * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
250     */
251    @Override
252    public long getUpperBound(Set<String> repositoryNames) {
253        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
254                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
255                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
256        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
257        long clusteringDelay = getClusteringDelay(repositoryNames);
258        if (clusteringDelay > -1) {
259            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
260            FilterBuilder filterBuilder = FilterBuilders.rangeFilter("logDate").lt(
261                    new Date(lastClusteringInvalidationDate));
262            builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder));
263        } else {
264            builder.setQuery(queryBuilder);
265        }
266        builder.addSort("id", SortOrder.DESC);
267        builder.setSize(1);
268        SearchResponse searchResponse = builder.execute().actionGet();
269        List<LogEntry> entries = new ArrayList<>();
270        SearchHits hits = searchResponse.getHits();
271        for (SearchHit hit : hits) {
272            try {
273                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
274            } catch (IOException e) {
275                log.error("Error while reading Audit Entry from ES", e);
276            }
277        }
278        if (entries.isEmpty()) {
279            if (clusteringDelay > -1) {
280                // Check for existing entries without the clustering invalidation date filter to not return -1 in this
281                // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >=
282                // 0
283                builder.setQuery(queryBuilder);
284                searchResponse = builder.execute().actionGet();
285                if (searchResponse.getHits().iterator().hasNext()) {
286                    log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
287                    return 0;
288                }
289            }
290            log.debug("Found no audit log entries, returning -1");
291            return -1;
292        }
293        return entries.get(0).getId();
294    }
295
296    @Override
297    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
298            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
299        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
300                upperBound, integerBounds, limit);
301        // Post filter the output to remove (un)registration that are unrelated
302        // to the current user.
303        // TODO move this to the ES query
304        List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>();
305        String principalName = session.getPrincipal().getName();
306        for (LogEntry entry : entries) {
307            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
308            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
309                // ignore event that only impact other users
310                continue;
311            }
312            if (log.isDebugEnabled()) {
313                if (log.isDebugEnabled()) {
314                    log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s",
315                            entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(),
316                            entry.getDocPath()));
317                }
318            }
319            postFilteredEntries.add(entry);
320        }
321        return postFilteredEntries;
322    }
323
324    protected Client getClient() {
325        if (esClient == null) {
326            log.info("Activate Elasticsearch backend for Audit");
327            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
328            esClient = esa.getClient();
329        }
330        return esClient;
331    }
332
333    protected String getESIndexName() {
334        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
335        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
336    }
337}