020package org.nuxeo.drive.elasticsearch;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Set;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.elasticsearch.action.search.SearchRequestBuilder;
031import org.elasticsearch.action.search.SearchResponse;
032import org.elasticsearch.action.search.SearchType;
033import org.elasticsearch.client.Client;
034import org.elasticsearch.index.query.BoolQueryBuilder;
035import org.elasticsearch.index.query.QueryBuilder;
036import org.elasticsearch.index.query.QueryBuilders;
037import org.elasticsearch.index.query.RangeQueryBuilder;
038import org.elasticsearch.index.query.TermsQueryBuilder;
039import org.elasticsearch.search.SearchHit;
040import org.elasticsearch.search.SearchHits;
041import org.elasticsearch.search.sort.SortOrder;
042import org.nuxeo.drive.service.SynchronizationRoots;
043import org.nuxeo.drive.service.impl.AuditChangeFinder;
044import org.nuxeo.ecm.core.api.CoreSession;
045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
046import org.nuxeo.ecm.platform.audit.api.LogEntry;
047import org.nuxeo.elasticsearch.ElasticSearchConstants;
048import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
049import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
050import org.nuxeo.runtime.api.Framework;
053 * Override the JPA audit based change finder to execute query in ES.
054 * <p>
055 * The structure of the query executed by the {@link AuditChangeFinder} is:
056 *
057 * <pre>
058 * from LogEntry log where log.repositoryId = :repositoryId
059 * 
060 * + AND if ActiveRoots (activeRoots) NOT empty
061 * 
062 * from LogEntry log where log.repositoryId = :repositoryId and (
064 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') )
065 * 
066 * 
067 * if ActiveRoots EMPTY:
068 * 
069 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category =
070 * 'NuxeoDrive' and log.eventId != 'rootUnregistered'))
071 * 
072 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by
073 * log.repositoryId asc, log.eventDate desc
074 * </pre>
075 *
076 * @since 7.3
077 */
078public class ESAuditChangeFinder extends AuditChangeFinder {
080    private static final long serialVersionUID = 1L;
082    public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class);
084    protected Client esClient = null;
086    protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
087            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
088            int limit) {
090        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
091                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
092                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
094        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
095        QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
096                upperBound, integerBounds, limit);
097        builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
099        builder.addSort("repositoryId", SortOrder.ASC);
100        builder.addSort("eventDate", SortOrder.DESC);
102        List<LogEntry> entries = new ArrayList<>();
103        SearchResponse searchResponse = builder.setSize(limit).execute().actionGet();
104        for (SearchHit hit : searchResponse.getHits()) {
105            try {
106                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
107            } catch (IOException e) {
108                log.error("Error while reading Audit Entry from ES", e);
109            }
110        }
111        return entries;
112    }
114    protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots,
115            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
116            int limit) {
117        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
119        // from LogEntry log where log.repositoryId = :repositoryId
120        QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName());
121        filterBuilder.must(repositoryClauseFilter);
123        if (activeRoots.getPaths().isEmpty()) {
124            // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered')
125            filterBuilder.must(getDriveLogsQueryClause());
126        } else {
128            BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery();
130            // LIST_DOC_EVENTS_IDS_QUERY
132            // (log.category = 'eventDocumentCategory' and (log.eventId =
133            // 'documentCreated' or log.eventId = 'documentModified' or
134            // log.eventId = 'documentMoved' or log.eventId =
135            // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or
136            // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId =
137            // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category =
138            // 'eventLifeCycleCategory' and log.eventId =
139            // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' )
140            String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy",
141                    "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked",
142                    "documentUnlocked" };
143            BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery();
144            orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true));
145            orEventsFilter.should(
146                    getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true));
147            orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false));
149            // ROOT_PATHS log.docPath like :rootPath1
150            if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) {
151                BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery();
152                rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths()));
153                rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds));
155                // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or
156                // COLECTIONS_PATHS)
157                // or (log.category = 'NuxeoDrive' and log.eventId !=
158                // 'rootUnregistered') )
159                orFilterBuilderIfActiveRoots.should(
160                        QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter));
161            } else {
162                orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must(
163                        getCurrentRootsClause(activeRoots.getPaths())));
164            }
166            orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause());
168            filterBuilder.must(orFilterBuilderIfActiveRoots);
169        }
171        filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound));
172        return filterBuilder;
174    }
176    protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) {
177        RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id");
178        rangeFilter.gt(lowerBound);
179        rangeFilter.lte(upperBound);
180        return rangeFilter;
181    }
183    protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) {
184        return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds);
185    }
187    protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) {
188        BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery();
189        for (String rootPath : rootPaths) {
190            orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath));
191        }
192        return orFilterRoots;
193    }
195    protected BoolQueryBuilder getDriveLogsQueryClause() {
196        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
197        filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive"));
198        filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered"));
199        return filterBuilder;
200    }
202    protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) {
203        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
204        filterBuilder.must(QueryBuilders.termQuery("category", category));
205        if (eventIds != null && eventIds.length > 0) {
206            if (eventIds.length == 1) {
207                if (shouldMatch) {
208                    filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
209                } else {
210                    filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0]));
211                }
212            } else {
213                if (shouldMatch) {
214                    filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
215                } else {
216                    filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds));
217                }
218            }
219        }
220        return filterBuilder;
221    }
223    @Override
224    public long getUpperBound() {
225        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
226                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
227                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
228        // TODO refactor this to use max clause
229        builder.setQuery(QueryBuilders.matchAllQuery());
230        builder.addSort("id", SortOrder.DESC);
231        builder.setSize(1);
232        SearchResponse searchResponse = builder.execute().actionGet();
233        List<LogEntry> entries = new ArrayList<>();
234        SearchHits hits = searchResponse.getHits();
235        for (SearchHit hit : hits) {
236            try {
237                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
238            } catch (IOException e) {
239                log.error("Error while reading Audit Entry from ES", e);
240            }
241        }
242        return entries.size() > 0 ? entries.get(0).getId() : -1;
243    }
245    /**
246     * Returns the last available log id in the audit index considering events older than the last clustering
247     * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the
248     * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
249     */
250    @Override
251    public long getUpperBound(Set<String> repositoryNames) {
252        SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName())
253                                                  .setTypes(ElasticSearchConstants.ENTRY_TYPE)
254                                                  .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
255        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
256        long clusteringDelay = getClusteringDelay(repositoryNames);
257        if (clusteringDelay > -1) {
258            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
259            RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate")
260                                                           .lt(new Date(lastClusteringInvalidationDate));
261            builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
262        } else {
263            builder.setQuery(queryBuilder);
264        }
265        builder.addSort("id", SortOrder.DESC);
266        builder.setSize(1);
267        SearchResponse searchResponse = builder.execute().actionGet();
268        List<LogEntry> entries = new ArrayList<>();
269        SearchHits hits = searchResponse.getHits();
270        for (SearchHit hit : hits) {
271            try {
272                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
273            } catch (IOException e) {
274                log.error("Error while reading Audit Entry from ES", e);
275            }
276        }
277        if (entries.isEmpty()) {
278            if (clusteringDelay > -1) {
279                // Check for existing entries without the clustering invalidation date filter to not return -1 in this
280                // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >=
281                // 0
282                builder.setQuery(queryBuilder);
283                searchResponse = builder.execute().actionGet();
284                if (searchResponse.getHits().iterator().hasNext()) {
285                    log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
286                    return 0;
287                }
288            }
289            log.debug("Found no audit log entries, returning -1");
290            return -1;
291        }
292        return entries.get(0).getId();
293    }
295    @Override
296    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
297            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
298            int limit) {
299        List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
300                upperBound, integerBounds, limit);
301        // Post filter the output to remove (un)registration that are unrelated
302        // to the current user.
303        // TODO move this to the ES query
304        List<LogEntry> postFilteredEntries = new ArrayList<>();
305        String principalName = session.getPrincipal().getName();
306        for (LogEntry entry : entries) {
307            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
308            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
309                // ignore event that only impact other users
310                continue;
311            }
312            if (log.isDebugEnabled()) {
313                if (log.isDebugEnabled()) {
314                    log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s",
315                            entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(),
316                            entry.getDocPath()));
317                }
318            }
319            postFilteredEntries.add(entry);
320        }
321        return postFilteredEntries;
322    }
324    protected Client getClient() {
325        if (esClient == null) {
326            log.info("Activate Elasticsearch backend for Audit");
327            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
328            esClient = esa.getClient();
329        }
330        return esClient;
331    }
333    protected String getESIndexName() {
334        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
335        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
336    }