001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mariana Cedica <mcedica@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 *     Kevin Leturc <kleturc@nuxeo.com>
020 */
021package org.nuxeo.drive.elasticsearch;
022
023import java.io.IOException;
024import java.time.ZonedDateTime;
025import java.time.temporal.ChronoUnit;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Set;
029
030import org.apache.logging.log4j.LogManager;
031import org.apache.logging.log4j.Logger;
032import org.elasticsearch.action.search.SearchRequest;
033import org.elasticsearch.action.search.SearchResponse;
034import org.elasticsearch.action.search.SearchType;
035import org.elasticsearch.index.query.BoolQueryBuilder;
036import org.elasticsearch.index.query.QueryBuilder;
037import org.elasticsearch.index.query.QueryBuilders;
038import org.elasticsearch.index.query.RangeQueryBuilder;
039import org.elasticsearch.index.query.TermsQueryBuilder;
040import org.elasticsearch.search.SearchHit;
041import org.elasticsearch.search.SearchHits;
042import org.elasticsearch.search.builder.SearchSourceBuilder;
043import org.elasticsearch.search.sort.SortOrder;
044import org.nuxeo.drive.service.SynchronizationRoots;
045import org.nuxeo.drive.service.impl.AuditChangeFinder;
046import org.nuxeo.ecm.core.api.CoreSession;
047import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
048import org.nuxeo.ecm.platform.audit.api.LogEntry;
049import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
050import org.nuxeo.elasticsearch.ElasticSearchConstants;
051import org.nuxeo.elasticsearch.api.ESClient;
052import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
053import org.nuxeo.runtime.api.Framework;
054
055import com.fasterxml.jackson.databind.ObjectMapper;
056
057/**
058 * Override the JPA audit based change finder to execute query in ES.
059 * <p>
060 * The structure of the query executed by the {@link AuditChangeFinder} is:
061 *
062 * <pre>
063 * {@code
064 * from LogEntry log where log.repositoryId = :repositoryId
065 *
066 * + AND if ActiveRoots (activeRoots) NOT empty
067 *
068 * from LogEntry log where log.repositoryId = :repositoryId and (
069 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or
070 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
071 *
072 *
073 * if ActiveRoots EMPTY:
074 *
075 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
076 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
077 *
078 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
079 * log.repositoryId asc, log.eventDate desc
080 * }
081 * </pre>
082 *
083 * @since 7.3
084 */
085public class ESAuditChangeFinder extends AuditChangeFinder {
086
087    private static final Logger log = LogManager.getLogger(ESAuditChangeFinder.class);
088
089    protected static final String EVENT_ID = "eventId";
090
091    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
092            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) {
093
094        SearchRequest request = new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH);
095
096        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
097        QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
098                upperBound);
099        SearchSourceBuilder source = new SearchSourceBuilder().query(
100                QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
101        source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC);
102        source.size(limit);
103        request.source(source);
104        List<LogEntry> entries = new ArrayList<>();
105        logSearchRequest(request);
106        SearchResponse searchResponse = getClient().search(request);
107        logSearchResponse(searchResponse);
108        ObjectMapper mapper = new ObjectMapper();
109        for (SearchHit hit : searchResponse.getHits()) {
110            try {
111                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
112            } catch (IOException e) {
113                log.error("Error while reading Audit Entry from ES", e);
114            }
115        }
116        return entries;
117    }
118
119    protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
120            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound) {
121        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
122
123        // from LogEntry log where log.repositoryId = :repositoryId
124        QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName());
125        filterBuilder.must(repositoryClauseFilter);
126
127        if (activeRoots.getPaths().isEmpty()) {
128            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
129            filterBuilder.must(getDriveLogsQueryClause());
130        } else {
131
132            BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery();
133
134            // LIST_DOC_EVENTS_IDS_QUERY
135
136            // (log.category = 'eventDocumentCategory' and (log.eventId =
137            // 'documentCreated' or log.eventId = 'documentModified' or
138            // log.eventId = 'documentMoved' or log.eventId =
139            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
140            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId =
141            // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category =
142            // 'eventLifeCycleCategory' and log.eventId =
143            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
144            String[] eventIds = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
145                    "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked",
146                    "documentUnlocked", "documentUntrashed" };
147            BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery();
148            orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true));
149            orEventsFilter.should(
150                    getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true));
151            orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
152
153            // ROOT_PATHS log.docPath like :rootPath1
154            if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) {
155                BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery();
156                rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths()));
157                rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds));
158
159                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
160                // COLECTIONS_PATHS)
161                // or (log.category = 'NuxeoDrive' and log.eventId !=
162                // 'rootUnregistered') )
163                orFilterBuilderIfActiveRoots.should(
164                        QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter));
165            } else {
166                orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must(
167                        getCurrentRootsClause(activeRoots.getPaths())));
168            }
169
170            orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause());
171
172            filterBuilder.must(orFilterBuilderIfActiveRoots);
173        }
174
175        filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound));
176        return filterBuilder;
177
178    }
179
180    protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
181        RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id");
182        rangeFilter.gt(lowerBound);
183        rangeFilter.lte(upperBound);
184        return rangeFilter;
185    }
186
187    protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
188        return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds);
189    }
190
191    protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) {
192        BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery();
193        for (String rootPath : rootPaths) {
194            orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath));
195        }
196        return orFilterRoots;
197    }
198
199    protected BoolQueryBuilder getDriveLogsQueryClause() {
200        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
201        filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive"));
202        filterBuilder.mustNot(QueryBuilders.termQuery(EVENT_ID, "rootUnregistered"));
203        return filterBuilder;
204    }
205
206    protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
207        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
208        filterBuilder.must(QueryBuilders.termQuery("category", category));
209        if (eventIds != null && eventIds.length > 0) {
210            if (eventIds.length == 1) {
211                if (shouldMatch) {
212                    filterBuilder.must(QueryBuilders.termQuery(EVENT_ID, eventIds[0]));
213                } else {
214                    filterBuilder.mustNot(QueryBuilders.termQuery(EVENT_ID, eventIds[0]));
215                }
216            } else {
217                if (shouldMatch) {
218                    filterBuilder.must(QueryBuilders.termsQuery(EVENT_ID, eventIds));
219                } else {
220                    filterBuilder.mustNot(QueryBuilders.termsQuery(EVENT_ID, eventIds));
221                }
222            }
223        }
224        return filterBuilder;
225    }
226
227    @Override
228    public long getUpperBound() {
229        SearchRequest request = new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH);
230        RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate");
231        SearchSourceBuilder source = new SearchSourceBuilder();
232        source.sort("id", SortOrder.DESC).size(1);
233        // scroll on previous days with a times 2 step up to 32
234        ESClient esClient = getClient();
235        for (int i = 1; i <= 32; i = i * 2) {
236            ZonedDateTime lowerLogDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.DAYS).minusDays(i);
237            // set lower bound in query
238            filterBuilder = filterBuilder.gt(lowerLogDateTime.toInstant().toEpochMilli());
239            source.query(QueryBuilders.boolQuery().filter(filterBuilder));
240            request.source(source);
241            // run request
242            logSearchRequest(request);
243            SearchResponse searchResponse = esClient.search(request);
244            logSearchResponse(searchResponse);
245
246            // if results return the first hit id
247            ObjectMapper mapper = new ObjectMapper();
248            SearchHits hits = searchResponse.getHits();
249            for (SearchHit hit : hits) {
250                try {
251                    return mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class).getId();
252                } catch (IOException e) {
253                    log.error("Error while reading Audit Entry from ES", e);
254                }
255            }
256        }
257        log.debug("Found no audit log entries, returning -1");
258        return -1;
259    }
260
261    @Override
262    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
263            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) {
264        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
265                upperBound, limit);
266        // Post filter the output to remove (un)registration that are unrelated
267        // to the current user.
268        // TODO move this to the ES query
269        List<LogEntry> postFilteredEntries = new ArrayList<>();
270        String principalName = session.getPrincipal().getName();
271        for (LogEntry entry : entries) {
272            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
273            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
274                // ignore event that only impact other users
275                continue;
276            }
277            log.debug("Change detected: {}", entry);
278            postFilteredEntries.add(entry);
279        }
280        return postFilteredEntries;
281    }
282
283    protected ESClient getClient() {
284        return Framework.getService(ElasticSearchAdmin.class).getClient();
285    }
286
287    protected String getESIndexName() {
288        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
289        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
290    }
291
292    protected void logSearchRequest(SearchRequest request) {
293        log.debug("Elasticsearch search request: curl -XGET 'http://localhost:9200/{}/_search?pretty' -d '{}'",
294                this::getESIndexName, () -> request);
295    }
296
297    protected void logSearchResponse(SearchResponse response) {
298        log.debug("Elasticsearch search response: {}", response);
299    }
300}