001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mariana Cedica <mcedica@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 *     Kevin Leturc <kleturc@nuxeo.com>
020 */
021package org.nuxeo.drive.elasticsearch;
022
023import java.io.IOException;
024import java.time.ZonedDateTime;
025import java.time.temporal.ChronoUnit;
026import java.util.ArrayList;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030
031import org.apache.logging.log4j.LogManager;
032import org.apache.logging.log4j.Logger;
033import org.elasticsearch.action.search.SearchRequest;
034import org.elasticsearch.action.search.SearchResponse;
035import org.elasticsearch.action.search.SearchType;
036import org.elasticsearch.index.query.BoolQueryBuilder;
037import org.elasticsearch.index.query.QueryBuilder;
038import org.elasticsearch.index.query.QueryBuilders;
039import org.elasticsearch.index.query.RangeQueryBuilder;
040import org.elasticsearch.index.query.TermsQueryBuilder;
041import org.elasticsearch.search.SearchHit;
042import org.elasticsearch.search.SearchHits;
043import org.elasticsearch.search.builder.SearchSourceBuilder;
044import org.elasticsearch.search.sort.SortOrder;
045import org.nuxeo.drive.service.SynchronizationRoots;
046import org.nuxeo.drive.service.impl.AuditChangeFinder;
047import org.nuxeo.ecm.core.api.CoreSession;
048import org.nuxeo.ecm.core.api.repository.RepositoryManager;
049import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
050import org.nuxeo.ecm.platform.audit.api.LogEntry;
051import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
052import org.nuxeo.elasticsearch.ElasticSearchConstants;
053import org.nuxeo.elasticsearch.api.ESClient;
054import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
055import org.nuxeo.runtime.api.Framework;
056
057import com.fasterxml.jackson.databind.ObjectMapper;
058
059/**
060 * Override the JPA audit based change finder to execute query in ES.
061 * <p>
062 * The structure of the query executed by the {@link AuditChangeFinder} is:
063 *
064 * <pre>
065 * from LogEntry log where log.repositoryId = :repositoryId
066 *
067 * + AND if ActiveRoots (activeRoots) NOT empty
068 *
069 * from LogEntry log where log.repositoryId = :repositoryId and (
070 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or
071 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
072 *
073 *
074 * if ActiveRoots EMPTY:
075 *
076 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
077 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
078 *
079 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
080 * log.repositoryId asc, log.eventDate desc
081 * </pre>
082 *
083 * @since 7.3
084 */
085public class ESAuditChangeFinder extends AuditChangeFinder {
086
087    private static final Logger log = LogManager.getLogger(ESAuditChangeFinder.class);
088
089    protected static final String EVENT_ID = "eventId";
090
091    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
092            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) {
093
094        SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
095                                                                   .searchType(SearchType.DFS_QUERY_THEN_FETCH);
096
097        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
098        QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
099                upperBound);
100        SearchSourceBuilder source = new SearchSourceBuilder().query(
101                QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
102        source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC);
103        source.size(limit);
104        request.source(source);
105        List<LogEntry> entries = new ArrayList<>();
106        logSearchRequest(request);
107        SearchResponse searchResponse = getClient().search(request);
108        logSearchResponse(searchResponse);
109        ObjectMapper mapper = new ObjectMapper();
110        for (SearchHit hit : searchResponse.getHits()) {
111            try {
112                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
113            } catch (IOException e) {
114                log.error("Error while reading Audit Entry from ES", e);
115            }
116        }
117        return entries;
118    }
119
120    protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
121            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound) {
122        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
123
124        // from LogEntry log where log.repositoryId = :repositoryId
125        QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName());
126        filterBuilder.must(repositoryClauseFilter);
127
128        if (activeRoots.getPaths().isEmpty()) {
129            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
130            filterBuilder.must(getDriveLogsQueryClause());
131        } else {
132
133            BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery();
134
135            // LIST_DOC_EVENTS_IDS_QUERY
136
137            // (log.category = 'eventDocumentCategory' and (log.eventId =
138            // 'documentCreated' or log.eventId = 'documentModified' or
139            // log.eventId = 'documentMoved' or log.eventId =
140            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
141            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId =
142            // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category =
143            // 'eventLifeCycleCategory' and log.eventId =
144            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
145            String[] eventIds = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
146                    "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked",
147                    "documentUnlocked", "documentUntrashed" };
148            BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery();
149            orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true));
150            orEventsFilter.should(
151                    getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true));
152            orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
153
154            // ROOT_PATHS log.docPath like :rootPath1
155            if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) {
156                BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery();
157                rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths()));
158                rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds));
159
160                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
161                // COLECTIONS_PATHS)
162                // or (log.category = 'NuxeoDrive' and log.eventId !=
163                // 'rootUnregistered') )
164                orFilterBuilderIfActiveRoots.should(
165                        QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter));
166            } else {
167                orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must(
168                        getCurrentRootsClause(activeRoots.getPaths())));
169            }
170
171            orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause());
172
173            filterBuilder.must(orFilterBuilderIfActiveRoots);
174        }
175
176        filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound));
177        return filterBuilder;
178
179    }
180
181    protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
182        RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id");
183        rangeFilter.gt(lowerBound);
184        rangeFilter.lte(upperBound);
185        return rangeFilter;
186    }
187
188    protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
189        return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds);
190    }
191
192    protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) {
193        BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery();
194        for (String rootPath : rootPaths) {
195            orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath));
196        }
197        return orFilterRoots;
198    }
199
200    protected BoolQueryBuilder getDriveLogsQueryClause() {
201        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
202        filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive"));
203        filterBuilder.mustNot(QueryBuilders.termQuery(EVENT_ID, "rootUnregistered"));
204        return filterBuilder;
205    }
206
207    protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
208        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
209        filterBuilder.must(QueryBuilders.termQuery("category", category));
210        if (eventIds != null && eventIds.length > 0) {
211            if (eventIds.length == 1) {
212                if (shouldMatch) {
213                    filterBuilder.must(QueryBuilders.termQuery(EVENT_ID, eventIds[0]));
214                } else {
215                    filterBuilder.mustNot(QueryBuilders.termQuery(EVENT_ID, eventIds[0]));
216                }
217            } else {
218                if (shouldMatch) {
219                    filterBuilder.must(QueryBuilders.termsQuery(EVENT_ID, eventIds));
220                } else {
221                    filterBuilder.mustNot(QueryBuilders.termsQuery(EVENT_ID, eventIds));
222                }
223            }
224        }
225        return filterBuilder;
226    }
227
228    @Override
229    public long getUpperBound() {
230        RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class);
231        return getUpperBound(new HashSet<>(repositoryManager.getRepositoryNames()));
232    }
233
234    /**
235     * Returns the last available log id in the audit index considering events older than the last clustering
236     * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the
237     * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
238     */
239    @Override
240    public long getUpperBound(Set<String> repositoryNames) {
241        SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
242                                                                   .searchType(SearchType.DFS_QUERY_THEN_FETCH);
243        RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate");
244        long clusteringDelay = getClusteringDelay(repositoryNames);
245        if (clusteringDelay > -1) {
246            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
247            filterBuilder = filterBuilder.lt(lastClusteringInvalidationDate);
248        }
249        SearchSourceBuilder source = new SearchSourceBuilder();
250        source.sort("id", SortOrder.DESC).size(1);
251        // scroll on previous days with a times 2 step up to 32
252        ESClient esClient = getClient();
253        for (int i = 1; i <= 32; i = i * 2) {
254            ZonedDateTime lowerLogDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.DAYS).minusDays(i);
255            // set lower bound in query
256            filterBuilder = filterBuilder.gt(lowerLogDateTime.toInstant().toEpochMilli());
257            source.query(QueryBuilders.boolQuery().filter(filterBuilder));
258            request.source(source);
259            // run request
260            logSearchRequest(request);
261            SearchResponse searchResponse = esClient.search(request);
262            logSearchResponse(searchResponse);
263
264            // if results return the first hit id
265            ObjectMapper mapper = new ObjectMapper();
266            SearchHits hits = searchResponse.getHits();
267            for (SearchHit hit : hits) {
268                try {
269                    return mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class).getId();
270                } catch (IOException e) {
271                    log.error("Error while reading Audit Entry from ES", e);
272                }
273            }
274        }
275        if (clusteringDelay > -1) {
276            // Check for existing entries without the clustering invalidation date filter to not return -1 in this
277            // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 0
278            source.query(QueryBuilders.matchAllQuery()).size(0);
279            request.source(source);
280            logSearchRequest(request);
281            SearchResponse searchResponse = esClient.search(request);
282            logSearchResponse(searchResponse);
283            if (searchResponse.getHits().getTotalHits() > 0) {
284                log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
285                return 0;
286            }
287        }
288        log.debug("Found no audit log entries, returning -1");
289        return -1;
290    }
291
292    @Override
293    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
294            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) {
295        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
296                upperBound, limit);
297        // Post filter the output to remove (un)registration that are unrelated
298        // to the current user.
299        // TODO move this to the ES query
300        List<LogEntry> postFilteredEntries = new ArrayList<>();
301        String principalName = session.getPrincipal().getName();
302        for (LogEntry entry : entries) {
303            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
304            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
305                // ignore event that only impact other users
306                continue;
307            }
308            log.debug("Change detected: {}", entry);
309            postFilteredEntries.add(entry);
310        }
311        return postFilteredEntries;
312    }
313
314    protected ESClient getClient() {
315        return Framework.getService(ElasticSearchAdmin.class).getClient();
316    }
317
318    protected String getESIndexName() {
319        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
320        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
321    }
322
323    protected void logSearchRequest(SearchRequest request) {
324        log.debug("Elasticsearch search request: curl -XGET 'http://localhost:9200/{}/{}/_search?pretty' -d '{}'",
325                this::getESIndexName, () -> ElasticSearchConstants.ENTRY_TYPE, () -> request);
326    }
327
328    protected void logSearchResponse(SearchResponse response) {
329        log.debug("Elasticsearch search response: {}", response);
330    }
331}