001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mariana Cedica <mcedica@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 *     Kevin Leturc <kleturc@nuxeo.com>
020 */
021package org.nuxeo.drive.elasticsearch;
022
023import java.io.IOException;
024import java.time.ZonedDateTime;
025import java.time.temporal.ChronoUnit;
026import java.util.ArrayList;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.elasticsearch.action.search.SearchRequest;
034import org.elasticsearch.action.search.SearchResponse;
035import org.elasticsearch.action.search.SearchType;
036import org.elasticsearch.index.query.BoolQueryBuilder;
037import org.elasticsearch.index.query.QueryBuilder;
038import org.elasticsearch.index.query.QueryBuilders;
039import org.elasticsearch.index.query.RangeQueryBuilder;
040import org.elasticsearch.index.query.TermsQueryBuilder;
041import org.elasticsearch.search.SearchHit;
042import org.elasticsearch.search.SearchHits;
043import org.elasticsearch.search.builder.SearchSourceBuilder;
044import org.elasticsearch.search.sort.SortOrder;
045import org.nuxeo.drive.service.SynchronizationRoots;
046import org.nuxeo.drive.service.impl.AuditChangeFinder;
047import org.nuxeo.ecm.core.api.CoreSession;
048import org.nuxeo.ecm.core.api.repository.RepositoryManager;
049import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
050import org.nuxeo.ecm.platform.audit.api.LogEntry;
051import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
052import org.nuxeo.elasticsearch.ElasticSearchConstants;
053import org.nuxeo.elasticsearch.api.ESClient;
054import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
055import org.nuxeo.runtime.api.Framework;
056
057import com.fasterxml.jackson.databind.ObjectMapper;
058
059/**
060 * Override the JPA audit based change finder to execute query in ES.
061 * <p>
062 * The structure of the query executed by the {@link AuditChangeFinder} is:
063 *
064 * <pre>
065 * from LogEntry log where log.repositoryId = :repositoryId
066 *
067 * + AND if ActiveRoots (activeRoots) NOT empty
068 *
069 * from LogEntry log where log.repositoryId = :repositoryId and (
070 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or
071 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
072 *
073 *
074 * if ActiveRoots EMPTY:
075 *
076 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
077 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
078 *
079 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
080 * log.repositoryId asc, log.eventDate desc
081 * </pre>
082 *
083 * @since 7.3
084 */
085public class ESAuditChangeFinder extends AuditChangeFinder {
086
087    private static final long serialVersionUID = 1L;
088
089    public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class);
090
091    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
092            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
093            int limit) {
094
095        SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
096                                                                   .searchType(SearchType.DFS_QUERY_THEN_FETCH);
097
098        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
099        QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
100                upperBound, integerBounds, limit);
101        SearchSourceBuilder source = new SearchSourceBuilder().query(
102                QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
103        source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC);
104        source.size(limit);
105        request.source(source);
106        List<LogEntry> entries = new ArrayList<>();
107        logSearchRequest(request);
108        SearchResponse searchResponse = getClient().search(request);
109        logSearchResponse(searchResponse);
110        ObjectMapper mapper = new ObjectMapper();
111        for (SearchHit hit : searchResponse.getHits()) {
112            try {
113                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
114            } catch (IOException e) {
115                log.error("Error while reading Audit Entry from ES", e);
116            }
117        }
118        return entries;
119    }
120
121    protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
122            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
123            int limit) {
124        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
125
126        // from LogEntry log where log.repositoryId = :repositoryId
127        QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName());
128        filterBuilder.must(repositoryClauseFilter);
129
130        if (activeRoots.getPaths().isEmpty()) {
131            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
132            filterBuilder.must(getDriveLogsQueryClause());
133        } else {
134
135            BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery();
136
137            // LIST_DOC_EVENTS_IDS_QUERY
138
139            // (log.category = 'eventDocumentCategory' and (log.eventId =
140            // 'documentCreated' or log.eventId = 'documentModified' or
141            // log.eventId = 'documentMoved' or log.eventId =
142            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
143            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId =
144            // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category =
145            // 'eventLifeCycleCategory' and log.eventId =
146            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
147            String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
148                    "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked",
149                    "documentUnlocked" };
150            BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery();
151            orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true));
152            orEventsFilter.should(
153                    getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true));
154            orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
155
156            // ROOT_PATHS log.docPath like :rootPath1
157            if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) {
158                BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery();
159                rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths()));
160                rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds));
161
162                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
163                // COLECTIONS_PATHS)
164                // or (log.category = 'NuxeoDrive' and log.eventId !=
165                // 'rootUnregistered') )
166                orFilterBuilderIfActiveRoots.should(
167                        QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter));
168            } else {
169                orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must(
170                        getCurrentRootsClause(activeRoots.getPaths())));
171            }
172
173            orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause());
174
175            filterBuilder.must(orFilterBuilderIfActiveRoots);
176        }
177
178        filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound));
179        return filterBuilder;
180
181    }
182
183    protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
184        RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id");
185        rangeFilter.gt(lowerBound);
186        rangeFilter.lte(upperBound);
187        return rangeFilter;
188    }
189
190    protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
191        return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds);
192    }
193
194    protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) {
195        BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery();
196        for (String rootPath : rootPaths) {
197            orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath));
198        }
199        return orFilterRoots;
200    }
201
202    protected BoolQueryBuilder getDriveLogsQueryClause() {
203        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
204        filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive"));
205        filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered"));
206        return filterBuilder;
207    }
208
209    protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
210        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
211        filterBuilder.must(QueryBuilders.termQuery("category", category));
212        if (eventIds != null && eventIds.length > 0) {
213            if (eventIds.length == 1) {
214                if (shouldMatch) {
215                    filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
216                } else {
217                    filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0]));
218                }
219            } else {
220                if (shouldMatch) {
221                    filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
222                } else {
223                    filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds));
224                }
225            }
226        }
227        return filterBuilder;
228    }
229
230    @Override
231    public long getUpperBound() {
232        RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class);
233        return getUpperBound(new HashSet<>(repositoryManager.getRepositoryNames()));
234    }
235
236    /**
237     * Returns the last available log id in the audit index considering events older than the last clustering
238     * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the
239     * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
240     */
241    @Override
242    public long getUpperBound(Set<String> repositoryNames) {
243        SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
244                                                                   .searchType(SearchType.DFS_QUERY_THEN_FETCH);
245        RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate");
246        long clusteringDelay = getClusteringDelay(repositoryNames);
247        if (clusteringDelay > -1) {
248            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
249            filterBuilder = filterBuilder.lt(lastClusteringInvalidationDate);
250        }
251        SearchSourceBuilder source = new SearchSourceBuilder();
252        source.sort("id", SortOrder.DESC).size(1);
253        // scroll on previous days with a times 2 step up to 32
254        ESClient esClient = getClient();
255        for (int i = 1; i <= 32; i = i * 2) {
256            ZonedDateTime lowerLogDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.DAYS).minusDays(i);
257            // set lower bound in query
258            filterBuilder = filterBuilder.gt(lowerLogDateTime.toInstant().toEpochMilli());
259            source.query(QueryBuilders.boolQuery().filter(filterBuilder));
260            request.source(source);
261            // run request
262            logSearchRequest(request);
263            SearchResponse searchResponse = esClient.search(request);
264            logSearchResponse(searchResponse);
265
266            // if results return the first hit id
267            ObjectMapper mapper = new ObjectMapper();
268            SearchHits hits = searchResponse.getHits();
269            for (SearchHit hit : hits) {
270                try {
271                    return mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class).getId();
272                } catch (IOException e) {
273                    log.error("Error while reading Audit Entry from ES", e);
274                }
275            }
276        }
277        if (clusteringDelay > -1) {
278            // Check for existing entries without the clustering invalidation date filter to not return -1 in this
279            // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 0
280            source.query(QueryBuilders.matchAllQuery()).size(0);
281            request.source(source);
282            logSearchRequest(request);
283            SearchResponse searchResponse = esClient.search(request);
284            logSearchResponse(searchResponse);
285            if (searchResponse.getHits().getTotalHits() > 0) {
286                log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
287                return 0;
288            }
289        }
290        log.debug("Found no audit log entries, returning -1");
291        return -1;
292    }
293
294    @Override
295    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
296            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
297            int limit) {
298        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
299                upperBound, integerBounds, limit);
300        // Post filter the output to remove (un)registration that are unrelated
301        // to the current user.
302        // TODO move this to the ES query
303        List<LogEntry> postFilteredEntries = new ArrayList<>();
304        String principalName = session.getPrincipal().getName();
305        for (LogEntry entry : entries) {
306            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
307            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
308                // ignore event that only impact other users
309                continue;
310            }
311            if (log.isDebugEnabled()) {
312                if (log.isDebugEnabled()) {
313                    log.debug(String.format("Change detected: %s", entry));
314                }
315            }
316            postFilteredEntries.add(entry);
317        }
318        return postFilteredEntries;
319    }
320
321    protected ESClient getClient() {
322        return Framework.getService(ElasticSearchAdmin.class).getClient();
323    }
324
325    protected String getESIndexName() {
326        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
327        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
328    }
329
330    protected void logSearchRequest(SearchRequest request) {
331        if (log.isDebugEnabled()) {
332            log.debug(String.format(
333                    "Elasticsearch search request: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
334                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
335        }
336    }
337
338    protected void logSearchResponse(SearchResponse response) {
339        if (log.isDebugEnabled()) {
340            log.debug("Elasticsearch search response: " + response.toString());
341        }
342    }
343}