001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mariana Cedica <mcedica@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020package org.nuxeo.drive.elasticsearch;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Set;
026
027import com.fasterxml.jackson.databind.ObjectMapper;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.SearchRequest;
031import org.elasticsearch.action.search.SearchResponse;
032import org.elasticsearch.action.search.SearchType;
033import org.elasticsearch.index.query.BoolQueryBuilder;
034import org.elasticsearch.index.query.QueryBuilder;
035import org.elasticsearch.index.query.QueryBuilders;
036import org.elasticsearch.index.query.RangeQueryBuilder;
037import org.elasticsearch.index.query.TermsQueryBuilder;
038import org.elasticsearch.search.SearchHit;
039import org.elasticsearch.search.SearchHits;
040import org.elasticsearch.search.builder.SearchSourceBuilder;
041import org.elasticsearch.search.sort.SortOrder;
042import org.nuxeo.drive.service.SynchronizationRoots;
043import org.nuxeo.drive.service.impl.AuditChangeFinder;
044import org.nuxeo.ecm.core.api.CoreSession;
045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
046import org.nuxeo.ecm.platform.audit.api.LogEntry;
047import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
048import org.nuxeo.elasticsearch.ElasticSearchConstants;
049import org.nuxeo.elasticsearch.api.ESClient;
050import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
051import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
052import org.nuxeo.runtime.api.Framework;
053
054/**
055 * Override the JPA audit based change finder to execute query in ES.
056 * <p>
057 * The structure of the query executed by the {@link AuditChangeFinder} is:
058 *
059 * <pre>
060 * from LogEntry log where log.repositoryId = :repositoryId
061 *
062 * + AND if ActiveRoots (activeRoots) NOT empty
063 *
064 * from LogEntry log where log.repositoryId = :repositoryId and (
065 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or
066 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
067 *
068 *
069 * if ActiveRoots EMPTY:
070 *
071 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
072 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
073 *
074 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
075 * log.repositoryId asc, log.eventDate desc
076 * </pre>
077 *
078 * @since 7.3
079 */
080public class ESAuditChangeFinder extends AuditChangeFinder {
081
082    private static final long serialVersionUID = 1L;
083
084    public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class);
085
086    protected ESClient esClient = null;
087
088    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
089            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
090            int limit) {
091
092        SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
093                                                  .searchType(SearchType.DFS_QUERY_THEN_FETCH);
094
095        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
096        QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
097                upperBound, integerBounds, limit);
098        SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
099        source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC);
100        source.size(limit);
101        request.source(source);
102        List<LogEntry> entries = new ArrayList<>();
103        logSearchRequest(request);
104        SearchResponse searchResponse = getClient().search(request);
105        logSearchResponse(searchResponse);
106        ObjectMapper mapper = new ObjectMapper();
107        for (SearchHit hit : searchResponse.getHits()) {
108            try {
109                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
110            } catch (IOException e) {
111                log.error("Error while reading Audit Entry from ES", e);
112            }
113        }
114        return entries;
115    }
116
117    protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
118            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
119            int limit) {
120        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
121
122        // from LogEntry log where log.repositoryId = :repositoryId
123        QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName());
124        filterBuilder.must(repositoryClauseFilter);
125
126        if (activeRoots.getPaths().isEmpty()) {
127            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
128            filterBuilder.must(getDriveLogsQueryClause());
129        } else {
130
131            BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery();
132
133            // LIST_DOC_EVENTS_IDS_QUERY
134
135            // (log.category = 'eventDocumentCategory' and (log.eventId =
136            // 'documentCreated' or log.eventId = 'documentModified' or
137            // log.eventId = 'documentMoved' or log.eventId =
138            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
139            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId =
140            // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category =
141            // 'eventLifeCycleCategory' and log.eventId =
142            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
143            String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
144                    "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked",
145                    "documentUnlocked" };
146            BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery();
147            orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true));
148            orEventsFilter.should(
149                    getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true));
150            orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
151
152            // ROOT_PATHS log.docPath like :rootPath1
153            if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) {
154                BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery();
155                rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths()));
156                rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds));
157
158                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
159                // COLECTIONS_PATHS)
160                // or (log.category = 'NuxeoDrive' and log.eventId !=
161                // 'rootUnregistered') )
162                orFilterBuilderIfActiveRoots.should(
163                        QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter));
164            } else {
165                orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must(
166                        getCurrentRootsClause(activeRoots.getPaths())));
167            }
168
169            orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause());
170
171            filterBuilder.must(orFilterBuilderIfActiveRoots);
172        }
173
174        filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound));
175        return filterBuilder;
176
177    }
178
179    protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
180        RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id");
181        rangeFilter.gt(lowerBound);
182        rangeFilter.lte(upperBound);
183        return rangeFilter;
184    }
185
186    protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
187        return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds);
188    }
189
190    protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) {
191        BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery();
192        for (String rootPath : rootPaths) {
193            orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath));
194        }
195        return orFilterRoots;
196    }
197
198    protected BoolQueryBuilder getDriveLogsQueryClause() {
199        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
200        filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive"));
201        filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered"));
202        return filterBuilder;
203    }
204
205    protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
206        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
207        filterBuilder.must(QueryBuilders.termQuery("category", category));
208        if (eventIds != null && eventIds.length > 0) {
209            if (eventIds.length == 1) {
210                if (shouldMatch) {
211                    filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
212                } else {
213                    filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0]));
214                }
215            } else {
216                if (shouldMatch) {
217                    filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
218                } else {
219                    filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds));
220                }
221            }
222        }
223        return filterBuilder;
224    }
225
226    @Override
227    public long getUpperBound() {
228        SearchRequest request = new SearchRequest(getESIndexName())
229                .types(ElasticSearchConstants.ENTRY_TYPE)
230                .searchType(SearchType.DFS_QUERY_THEN_FETCH);
231        // TODO refactor this to use max clause
232        request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())
233                .sort("id", SortOrder.DESC).size(1));
234        logSearchRequest(request);
235        SearchResponse searchResponse = getClient().search(request);
236        logSearchResponse(searchResponse);
237        List<LogEntry> entries = new ArrayList<>();
238        SearchHits hits = searchResponse.getHits();
239        ObjectMapper mapper = new ObjectMapper();
240        for (SearchHit hit : hits) {
241            try {
242                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
243            } catch (IOException e) {
244                log.error("Error while reading Audit Entry from ES", e);
245            }
246        }
247        return entries.size() > 0 ? entries.get(0).getId() : -1;
248    }
249
250    /**
251     * Returns the last available log id in the audit index considering events older than the last clustering
252     * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the
253     * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
254     */
255    @Override
256    public long getUpperBound(Set<String> repositoryNames) {
257        SearchRequest request = new SearchRequest(getESIndexName())
258                .types(ElasticSearchConstants.ENTRY_TYPE)
259                .searchType(SearchType.DFS_QUERY_THEN_FETCH);
260        SearchSourceBuilder source = new SearchSourceBuilder();
261        long clusteringDelay = getClusteringDelay(repositoryNames);
262        if (clusteringDelay > -1) {
263            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
264            RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate")
265                                                           .lt(lastClusteringInvalidationDate);
266            source.query(QueryBuilders.boolQuery().filter(filterBuilder));
267        } else {
268            source.query(QueryBuilders.matchAllQuery());
269        }
270        source.sort("id", SortOrder.DESC).size(1);
271        request.source(source);
272        logSearchRequest(request);
273        SearchResponse searchResponse = getClient().search(request);
274        logSearchResponse(searchResponse);
275        List<LogEntry> entries = new ArrayList<>();
276        SearchHits hits = searchResponse.getHits();
277        ObjectMapper mapper = new ObjectMapper();
278        for (SearchHit hit : hits) {
279            try {
280                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
281            } catch (IOException e) {
282                log.error("Error while reading Audit Entry from ES", e);
283            }
284        }
285        if (entries.isEmpty()) {
286            if (clusteringDelay > -1) {
287                // Check for existing entries without the clustering invalidation date filter to not return -1 in this
288                // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >=
289                // 0
290                source.query(QueryBuilders.matchAllQuery());
291                request.source(source);
292                logSearchRequest(request);
293                searchResponse = getClient().search(request);
294                logSearchResponse(searchResponse);
295                if (searchResponse.getHits().iterator().hasNext()) {
296                    log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
297                    return 0;
298                }
299            }
300            log.debug("Found no audit log entries, returning -1");
301            return -1;
302        }
303        return entries.get(0).getId();
304    }
305
306    @Override
307    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
308            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
309            int limit) {
310        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
311                upperBound, integerBounds, limit);
312        // Post filter the output to remove (un)registration that are unrelated
313        // to the current user.
314        // TODO move this to the ES query
315        List<LogEntry> postFilteredEntries = new ArrayList<>();
316        String principalName = session.getPrincipal().getName();
317        for (LogEntry entry : entries) {
318            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
319            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
320                // ignore event that only impact other users
321                continue;
322            }
323            if (log.isDebugEnabled()) {
324                if (log.isDebugEnabled()) {
325                    log.debug(String.format("Change detected: %s", entry));
326                }
327            }
328            postFilteredEntries.add(entry);
329        }
330        return postFilteredEntries;
331    }
332
333    protected ESClient getClient() {
334        if (esClient == null) {
335            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
336            esClient = esa.getClient();
337        }
338        return esClient;
339    }
340
341    protected String getESIndexName() {
342        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
343        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
344    }
345
346    protected void logSearchRequest(SearchRequest request) {
347        if (log.isDebugEnabled()) {
348            log.debug(String.format(
349                    "Elasticsearch search request: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
350                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
351        }
352    }
353
354    protected void logSearchResponse(SearchResponse response) {
355        if (log.isDebugEnabled()) {
356            log.debug("Elasticsearch search response: " + response.toString());
357        }
358    }
359}