001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mariana Cedica <mcedica@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020package org.nuxeo.drive.elasticsearch;
021
022import java.io.IOException;
023import java.time.ZonedDateTime;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Set;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.SearchRequest;
031import org.elasticsearch.action.search.SearchResponse;
032import org.elasticsearch.action.search.SearchType;
033import org.elasticsearch.index.query.BoolQueryBuilder;
034import org.elasticsearch.index.query.QueryBuilder;
035import org.elasticsearch.index.query.QueryBuilders;
036import org.elasticsearch.index.query.RangeQueryBuilder;
037import org.elasticsearch.index.query.TermsQueryBuilder;
038import org.elasticsearch.search.SearchHit;
039import org.elasticsearch.search.SearchHits;
040import org.elasticsearch.search.builder.SearchSourceBuilder;
041import org.elasticsearch.search.sort.SortOrder;
042import org.nuxeo.drive.service.SynchronizationRoots;
043import org.nuxeo.drive.service.impl.AuditChangeFinder;
044import org.nuxeo.ecm.core.api.CoreSession;
045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
046import org.nuxeo.ecm.platform.audit.api.LogEntry;
047import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
048import org.nuxeo.elasticsearch.ElasticSearchConstants;
049import org.nuxeo.elasticsearch.api.ESClient;
050import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
051import org.nuxeo.runtime.api.Framework;
052
053import com.fasterxml.jackson.databind.ObjectMapper;
054
055/**
056 * Override the JPA audit based change finder to execute query in ES.
057 * <p>
058 * The structure of the query executed by the {@link AuditChangeFinder} is:
059 *
060 * <pre>
061 * from LogEntry log where log.repositoryId = :repositoryId
062 *
063 * + AND if ActiveRoots (activeRoots) NOT empty
064 *
065 * from LogEntry log where log.repositoryId = :repositoryId and (
066 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or
067 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
068 *
069 *
070 * if ActiveRoots EMPTY:
071 *
072 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
073 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
074 *
075 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
076 * log.repositoryId asc, log.eventDate desc
077 * </pre>
078 *
079 * @since 7.3
080 */
081public class ESAuditChangeFinder extends AuditChangeFinder {
082
083    private static final long serialVersionUID = 1L;
084
085    public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class);
086
087    protected ESClient esClient = null;
088
089    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
090            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
091            int limit) {
092
093        SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
094                                                                   .searchType(SearchType.DFS_QUERY_THEN_FETCH);
095
096        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
097        QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
098                upperBound, integerBounds, limit);
099        SearchSourceBuilder source = new SearchSourceBuilder().query(
100                QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
101        source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC);
102        source.size(limit);
103        request.source(source);
104        List<LogEntry> entries = new ArrayList<>();
105        logSearchRequest(request);
106        SearchResponse searchResponse = getClient().search(request);
107        logSearchResponse(searchResponse);
108        ObjectMapper mapper = new ObjectMapper();
109        for (SearchHit hit : searchResponse.getHits()) {
110            try {
111                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
112            } catch (IOException e) {
113                log.error("Error while reading Audit Entry from ES", e);
114            }
115        }
116        return entries;
117    }
118
119    protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
120            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
121            int limit) {
122        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
123
124        // from LogEntry log where log.repositoryId = :repositoryId
125        QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName());
126        filterBuilder.must(repositoryClauseFilter);
127
128        if (activeRoots.getPaths().isEmpty()) {
129            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
130            filterBuilder.must(getDriveLogsQueryClause());
131        } else {
132
133            BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery();
134
135            // LIST_DOC_EVENTS_IDS_QUERY
136
137            // (log.category = 'eventDocumentCategory' and (log.eventId =
138            // 'documentCreated' or log.eventId = 'documentModified' or
139            // log.eventId = 'documentMoved' or log.eventId =
140            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
141            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId =
142            // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category =
143            // 'eventLifeCycleCategory' and log.eventId =
144            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
145            String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
146                    "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked",
147                    "documentUnlocked" };
148            BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery();
149            orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true));
150            orEventsFilter.should(
151                    getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true));
152            orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
153
154            // ROOT_PATHS log.docPath like :rootPath1
155            if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) {
156                BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery();
157                rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths()));
158                rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds));
159
160                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
161                // COLECTIONS_PATHS)
162                // or (log.category = 'NuxeoDrive' and log.eventId !=
163                // 'rootUnregistered') )
164                orFilterBuilderIfActiveRoots.should(
165                        QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter));
166            } else {
167                orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must(
168                        getCurrentRootsClause(activeRoots.getPaths())));
169            }
170
171            orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause());
172
173            filterBuilder.must(orFilterBuilderIfActiveRoots);
174        }
175
176        filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound));
177        return filterBuilder;
178
179    }
180
181    protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
182        RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id");
183        rangeFilter.gt(lowerBound);
184        rangeFilter.lte(upperBound);
185        return rangeFilter;
186    }
187
188    protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
189        return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds);
190    }
191
192    protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) {
193        BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery();
194        for (String rootPath : rootPaths) {
195            orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath));
196        }
197        return orFilterRoots;
198    }
199
200    protected BoolQueryBuilder getDriveLogsQueryClause() {
201        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
202        filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive"));
203        filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered"));
204        return filterBuilder;
205    }
206
207    protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
208        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
209        filterBuilder.must(QueryBuilders.termQuery("category", category));
210        if (eventIds != null && eventIds.length > 0) {
211            if (eventIds.length == 1) {
212                if (shouldMatch) {
213                    filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
214                } else {
215                    filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0]));
216                }
217            } else {
218                if (shouldMatch) {
219                    filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
220                } else {
221                    filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds));
222                }
223            }
224        }
225        return filterBuilder;
226    }
227
228    @Override
229    public long getUpperBound() {
230        SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
231                                                                   .searchType(SearchType.DFS_QUERY_THEN_FETCH);
232        // TODO refactor this to use max clause
233        request.source(
234                new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).sort("id", SortOrder.DESC).size(1));
235        logSearchRequest(request);
236        SearchResponse searchResponse = getClient().search(request);
237        logSearchResponse(searchResponse);
238        List<LogEntry> entries = new ArrayList<>();
239        SearchHits hits = searchResponse.getHits();
240        ObjectMapper mapper = new ObjectMapper();
241        for (SearchHit hit : hits) {
242            try {
243                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
244            } catch (IOException e) {
245                log.error("Error while reading Audit Entry from ES", e);
246            }
247        }
248        return entries.size() > 0 ? entries.get(0).getId() : -1;
249    }
250
251    /**
252     * Returns the last available log id in the audit index considering events older than the last clustering
253     * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the
254     * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
255     */
256    @Override
257    public long getUpperBound(Set<String> repositoryNames) {
258        SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
259                                                                   .searchType(SearchType.DFS_QUERY_THEN_FETCH);
260        RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate");
261        long clusteringDelay = getClusteringDelay(repositoryNames);
262        if (clusteringDelay > -1) {
263            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
264            filterBuilder = filterBuilder.lt(lastClusteringInvalidationDate);
265        }
266        SearchSourceBuilder source = new SearchSourceBuilder();
267        source.sort("id", SortOrder.DESC).size(1);
268        // scroll on previous days with a times 2 step up to 32
269        for (int i = 1; i <= 32; i = i * 2) {
270            ZonedDateTime lowerLogDateTime = ZonedDateTime.now().minusDays(i);
271            // set lower bound in query
272            filterBuilder = filterBuilder.gt(lowerLogDateTime.toInstant().toEpochMilli());
273            source.query(QueryBuilders.boolQuery().filter(filterBuilder));
274            request.source(source);
275            // run request
276            logSearchRequest(request);
277            SearchResponse searchResponse = getClient().search(request);
278            logSearchResponse(searchResponse);
279
280            // if results return the first hit id
281            ObjectMapper mapper = new ObjectMapper();
282            SearchHits hits = searchResponse.getHits();
283            for (SearchHit hit : hits) {
284                try {
285                    return mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class).getId();
286                } catch (IOException e) {
287                    log.error("Error while reading Audit Entry from ES", e);
288                }
289            }
290        }
291        if (clusteringDelay > -1) {
292            // Check for existing entries without the clustering invalidation date filter to not return -1 in this
293            // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 0
294            source.query(QueryBuilders.matchAllQuery()).size(0);
295            request.source(source);
296            logSearchRequest(request);
297            SearchResponse searchResponse = getClient().search(request);
298            logSearchResponse(searchResponse);
299            if (searchResponse.getHits().getTotalHits() > 0) {
300                log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
301                return 0;
302            }
303        }
304        log.debug("Found no audit log entries, returning -1");
305        return -1;
306    }
307
308    @Override
309    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
310            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
311            int limit) {
312        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
313                upperBound, integerBounds, limit);
314        // Post filter the output to remove (un)registration that are unrelated
315        // to the current user.
316        // TODO move this to the ES query
317        List<LogEntry> postFilteredEntries = new ArrayList<>();
318        String principalName = session.getPrincipal().getName();
319        for (LogEntry entry : entries) {
320            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
321            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
322                // ignore event that only impact other users
323                continue;
324            }
325            if (log.isDebugEnabled()) {
326                if (log.isDebugEnabled()) {
327                    log.debug(String.format("Change detected: %s", entry));
328                }
329            }
330            postFilteredEntries.add(entry);
331        }
332        return postFilteredEntries;
333    }
334
335    protected ESClient getClient() {
336        if (esClient == null) {
337            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
338            esClient = esa.getClient();
339        }
340        return esClient;
341    }
342
343    protected String getESIndexName() {
344        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
345        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
346    }
347
348    protected void logSearchRequest(SearchRequest request) {
349        if (log.isDebugEnabled()) {
350            log.debug(String.format(
351                    "Elasticsearch search request: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
352                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
353        }
354    }
355
356    protected void logSearchResponse(SearchResponse response) {
357        if (log.isDebugEnabled()) {
358            log.debug("Elasticsearch search response: " + response.toString());
359        }
360    }
361}