001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Mariana Cedica <mcedica@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020package org.nuxeo.drive.elasticsearch;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Set;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.SearchRequestBuilder;
031import org.elasticsearch.action.search.SearchResponse;
032import org.elasticsearch.action.search.SearchType;
033import org.elasticsearch.client.Client;
034import org.elasticsearch.index.query.BoolQueryBuilder;
035import org.elasticsearch.index.query.QueryBuilder;
036import org.elasticsearch.index.query.QueryBuilders;
037import org.elasticsearch.index.query.RangeQueryBuilder;
038import org.elasticsearch.index.query.TermsQueryBuilder;
039import org.elasticsearch.search.SearchHit;
040import org.elasticsearch.search.SearchHits;
041import org.elasticsearch.search.sort.SortOrder;
042import org.nuxeo.drive.service.SynchronizationRoots;
043import org.nuxeo.drive.service.impl.AuditChangeFinder;
044import org.nuxeo.ecm.core.api.CoreSession;
045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
046import org.nuxeo.ecm.platform.audit.api.LogEntry;
047import org.nuxeo.elasticsearch.ElasticSearchConstants;
048import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
049import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
050import org.nuxeo.runtime.api.Framework;
051
052/**
053 * Override the JPA audit based change finder to execute query in ES.
054 * <p>
055 * The structure of the query executed by the {@link AuditChangeFinder} is:
056 *
057 * <pre>
058 * from LogEntry log where log.repositoryId = :repositoryId
059 *
060 * + AND if ActiveRoots (activeRoots) NOT empty
061 *
062 * from LogEntry log where log.repositoryId = :repositoryId and (
063 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or
064 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
065 *
066 *
067 * if ActiveRoots EMPTY:
068 *
069 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
070 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
071 *
072 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
073 * log.repositoryId asc, log.eventDate desc
074 * </pre>
075 *
076 * @since 7.3
077 */
078public class ESAuditChangeFinder extends AuditChangeFinder {
079
080    private static final long serialVersionUID = 1L;
081
082    public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class);
083
084    protected Client esClient = null;
085
086    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
087            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
088            int limit) {
089
090        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
091                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
092                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
093
094        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
095        QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
096                upperBound, integerBounds, limit);
097        builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
098
099        builder.addSort("repositoryId", SortOrder.ASC);
100        builder.addSort("eventDate", SortOrder.DESC);
101
102        List<LogEntry> entries = new ArrayList<>();
103        logSearchRequest(builder);
104        SearchResponse searchResponse = builder.setSize(limit).execute().actionGet();
105        logSearchResponse(searchResponse);
106        for (SearchHit hit : searchResponse.getHits()) {
107            try {
108                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
109            } catch (IOException e) {
110                log.error("Error while reading Audit Entry from ES", e);
111            }
112        }
113        return entries;
114    }
115
116    protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
117            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
118            int limit) {
119        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
120
121        // from LogEntry log where log.repositoryId = :repositoryId
122        QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName());
123        filterBuilder.must(repositoryClauseFilter);
124
125        if (activeRoots.getPaths().isEmpty()) {
126            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
127            filterBuilder.must(getDriveLogsQueryClause());
128        } else {
129
130            BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery();
131
132            // LIST_DOC_EVENTS_IDS_QUERY
133
134            // (log.category = 'eventDocumentCategory' and (log.eventId =
135            // 'documentCreated' or log.eventId = 'documentModified' or
136            // log.eventId = 'documentMoved' or log.eventId =
137            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
138            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId =
139            // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category =
140            // 'eventLifeCycleCategory' and log.eventId =
141            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
142            String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
143                    "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked",
144                    "documentUnlocked" };
145            BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery();
146            orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true));
147            orEventsFilter.should(
148                    getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true));
149            orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
150
151            // ROOT_PATHS log.docPath like :rootPath1
152            if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) {
153                BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery();
154                rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths()));
155                rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds));
156
157                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
158                // COLECTIONS_PATHS)
159                // or (log.category = 'NuxeoDrive' and log.eventId !=
160                // 'rootUnregistered') )
161                orFilterBuilderIfActiveRoots.should(
162                        QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter));
163            } else {
164                orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must(
165                        getCurrentRootsClause(activeRoots.getPaths())));
166            }
167
168            orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause());
169
170            filterBuilder.must(orFilterBuilderIfActiveRoots);
171        }
172
173        filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound));
174        return filterBuilder;
175
176    }
177
178    protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
179        RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id");
180        rangeFilter.gt(lowerBound);
181        rangeFilter.lte(upperBound);
182        return rangeFilter;
183    }
184
185    protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
186        return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds);
187    }
188
189    protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) {
190        BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery();
191        for (String rootPath : rootPaths) {
192            orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath));
193        }
194        return orFilterRoots;
195    }
196
197    protected BoolQueryBuilder getDriveLogsQueryClause() {
198        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
199        filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive"));
200        filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered"));
201        return filterBuilder;
202    }
203
204    protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
205        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
206        filterBuilder.must(QueryBuilders.termQuery("category", category));
207        if (eventIds != null && eventIds.length > 0) {
208            if (eventIds.length == 1) {
209                if (shouldMatch) {
210                    filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
211                } else {
212                    filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0]));
213                }
214            } else {
215                if (shouldMatch) {
216                    filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
217                } else {
218                    filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds));
219                }
220            }
221        }
222        return filterBuilder;
223    }
224
225    @Override
226    public long getUpperBound() {
227        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
228                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
229                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
230        // TODO refactor this to use max clause
231        builder.setQuery(QueryBuilders.matchAllQuery());
232        builder.addSort("id", SortOrder.DESC);
233        builder.setSize(1);
234        logSearchRequest(builder);
235        SearchResponse searchResponse = builder.execute().actionGet();
236        logSearchResponse(searchResponse);
237        List<LogEntry> entries = new ArrayList<>();
238        SearchHits hits = searchResponse.getHits();
239        for (SearchHit hit : hits) {
240            try {
241                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
242            } catch (IOException e) {
243                log.error("Error while reading Audit Entry from ES", e);
244            }
245        }
246        return entries.size() > 0 ? entries.get(0).getId() : -1;
247    }
248
249    /**
250     * Returns the last available log id in the audit index considering events older than the last clustering
251     * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the
252     * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
253     */
254    @Override
255    public long getUpperBound(Set<String> repositoryNames) {
256        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
257                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
258                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
259        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
260        long clusteringDelay = getClusteringDelay(repositoryNames);
261        if (clusteringDelay > -1) {
262            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
263            RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate")
264                                                           .lt(new Date(lastClusteringInvalidationDate));
265            builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
266        } else {
267            builder.setQuery(queryBuilder);
268        }
269        builder.addSort("id", SortOrder.DESC);
270        builder.setSize(1);
271        logSearchRequest(builder);
272        SearchResponse searchResponse = builder.execute().actionGet();
273        logSearchResponse(searchResponse);
274        List<LogEntry> entries = new ArrayList<>();
275        SearchHits hits = searchResponse.getHits();
276        for (SearchHit hit : hits) {
277            try {
278                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
279            } catch (IOException e) {
280                log.error("Error while reading Audit Entry from ES", e);
281            }
282        }
283        if (entries.isEmpty()) {
284            if (clusteringDelay > -1) {
285                // Check for existing entries without the clustering invalidation date filter to not return -1 in this
286                // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >=
287                // 0
288                builder.setQuery(queryBuilder);
289                logSearchRequest(builder);
290                searchResponse = builder.execute().actionGet();
291                logSearchResponse(searchResponse);
292                if (searchResponse.getHits().iterator().hasNext()) {
293                    log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
294                    return 0;
295                }
296            }
297            log.debug("Found no audit log entries, returning -1");
298            return -1;
299        }
300        return entries.get(0).getId();
301    }
302
303    @Override
304    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
305            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
306            int limit) {
307        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
308                upperBound, integerBounds, limit);
309        // Post filter the output to remove (un)registration that are unrelated
310        // to the current user.
311        // TODO move this to the ES query
312        List<LogEntry> postFilteredEntries = new ArrayList<>();
313        String principalName = session.getPrincipal().getName();
314        for (LogEntry entry : entries) {
315            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
316            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
317                // ignore event that only impact other users
318                continue;
319            }
320            if (log.isDebugEnabled()) {
321                if (log.isDebugEnabled()) {
322                    log.debug(String.format("Change detected: %s", entry));
323                }
324            }
325            postFilteredEntries.add(entry);
326        }
327        return postFilteredEntries;
328    }
329
330    protected Client getClient() {
331        if (esClient == null) {
332            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
333            esClient = esa.getClient();
334        }
335        return esClient;
336    }
337
338    protected String getESIndexName() {
339        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
340        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
341    }
342
343    protected void logSearchRequest(SearchRequestBuilder request) {
344        if (log.isDebugEnabled()) {
345            log.debug(String.format(
346                    "Elasticsearch search request: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
347                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
348        }
349    }
350
351    protected void logSearchResponse(SearchResponse response) {
352        if (log.isDebugEnabled()) {
353            log.debug("Elasticsearch search response: " + response.toString());
354        }
355    }
356}