001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.drive.service.impl;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.apache.logging.log4j.LogManager;
029import org.apache.logging.log4j.Logger;
030import org.nuxeo.drive.adapter.FileSystemItem;
031import org.nuxeo.drive.adapter.RootlessItemException;
032import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem;
033import org.nuxeo.drive.service.FileSystemChangeFinder;
034import org.nuxeo.drive.service.FileSystemItemAdapterService;
035import org.nuxeo.drive.service.FileSystemItemChange;
036import org.nuxeo.drive.service.NuxeoDriveEvents;
037import org.nuxeo.drive.service.NuxeoDriveManager;
038import org.nuxeo.drive.service.SynchronizationRoots;
039import org.nuxeo.drive.service.TooManyChangesException;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.DocumentRef;
043import org.nuxeo.ecm.core.api.IdRef;
044import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
045import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService;
046import org.nuxeo.ecm.platform.audit.api.AuditReader;
047import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
048import org.nuxeo.ecm.platform.audit.api.LogEntry;
049import org.nuxeo.runtime.api.Framework;
050
051/**
052 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}.
053 *
054 * @author Antoine Taillefer
055 */
056public class AuditChangeFinder implements FileSystemChangeFinder {
057
058    private static final Logger log = LogManager.getLogger(AuditChangeFinder.class);
059
060    protected Map<String, String> parameters = new HashMap<>();
061
062    @Override
063    public void handleParameters(Map<String, String> parameters) {
064        this.parameters.putAll(parameters);
065    }
066
067    @Override
068    public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
069            SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound,
070            int limit) {
071        String principalName = session.getPrincipal().getName();
072        List<FileSystemItemChange> changes = new ArrayList<>();
073
074        // Note: lastActiveRootRefs is not used: we could remove it from the
075        // public API
076        // and from the client as well but it might be useful to optimize future
077        // alternative implementations FileSystemChangeFinder component so it
078        // might
079        // be better to leave it part of the public API as currently.
080
081        // Find changes from the log under active roots or events that are
082        // linked to the un-registration or deletion of formerly synchronized
083        // roots
084        List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
085                upperBound, limit);
086
087        // First pass over the entries to check if a "NuxeoDrive" event has
088        // occurred during that period.
089        // This event can be:
090        // - a root registration
091        // - a root unregistration
092        // - a "deleted" transition / documentTrashed event
093        // - an "undeleted" transition
094        // - a removal
095        // - a move to an non synchronization root
096        // - a security update
097        // Thus the list of active roots may have changed and the cache might
098        // need to be invalidated: let's make sure we perform a
099        // query with the actual active roots.
100        for (LogEntry entry : entries) {
101            if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) {
102                log.debug("Detected sync root change for user '{}' in audit log:"
103                        + " invalidating the root cache and refetching the changes.", principalName);
104                NuxeoDriveManager driveManager = Framework.getService(NuxeoDriveManager.class);
105                driveManager.invalidateSynchronizationRootsCache(principalName);
106                driveManager.invalidateCollectionSyncRootMemberCache(principalName);
107                Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots(
108                        session.getPrincipal());
109                SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName());
110                Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds(
111                        session.getPrincipal()).get(session.getRepositoryName());
112                entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds, lowerBound,
113                        upperBound, limit);
114                break;
115            }
116        }
117
118        if (entries.size() >= limit) {
119            throw new TooManyChangesException("Too many changes found in the audit logs.");
120        }
121        for (LogEntry entry : entries) {
122            log.debug("Handling log entry {}", entry);
123            FileSystemItemChange change = null;
124            DocumentRef docRef = new IdRef(entry.getDocUUID());
125            ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId");
126            if (fsIdInfo != null) {
127                // This document has been deleted, moved, is an unregistered synchronization root or its security has
128                // been updated, we just know the FileSystemItem id and name.
129                log.debug("Found extended info in audit log entry: document has been deleted, moved,"
130                        + " is an unregistered synchronization root or its security has been updated,"
131                        + " we just know the FileSystemItem id and name.");
132                boolean isChangeSet = false;
133                // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry,
134                // only in the case of a move or a security update.
135                // This can succeed if this is a move to a synchronization root or a security update after which the
136                // current user still has access to the document.
137                if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) {
138                    change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class));
139                    if (change != null) {
140                        if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
141                            // A move to a synchronization root also fires a documentMoved event, don't propagate the
142                            // virtual event.
143                            log.debug(
144                                    "Document {} ({}) has been moved to another synchronzation root, not adding entry to the change summary.",
145                                    entry::getDocPath, () -> docRef);
146                            continue;
147                        }
148                        isChangeSet = true;
149                    }
150                }
151                if (!isChangeSet) {
152                    // If the document has been deleted, is a regular unregistered synchronization root, has been moved
153                    // to a non synchronization root, if its security has been updated denying access to the current
154                    // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the
155                    // FileSystemItem id and name to the FileSystemItemChange entry.
156                    log.debug(
157                            "Document {} ({}) doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.",
158                            entry::getDocPath, () -> docRef);
159                    String fsId = fsIdInfo.getValue(String.class);
160                    String eventId;
161                    if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
162                        // Move to a non synchronization root
163                        eventId = NuxeoDriveEvents.DELETED_EVENT;
164                    } else {
165                        // Deletion, unregistration or security update
166                        eventId = entry.getEventId();
167                    }
168                    change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(),
169                            entry.getRepositoryId(), entry.getDocUUID(), fsId, null);
170                }
171                log.debug("Adding FileSystemItemChange entry to the change summary: {}", change);
172                changes.add(change);
173            } else {
174                // No extended info in the audit log entry, this should not be a deleted document, a moved document, an
175                // unregistered synchronization root nor a security update denying access to the current user.
176                log.debug(
177                        "No extended info found in audit log entry {} ({}): this is not a deleted document, a moved document,"
178                                + " an unregistered synchronization root nor a security update denying access to the current user.",
179                        entry::getDocPath, () -> docRef);
180                if (!session.exists(docRef)) {
181                    log.debug("Document {} ({}) doesn't exist, not adding entry to the change summary.",
182                            entry::getDocPath, () -> docRef);
183                    // Deleted or non accessible documents are mapped to
184                    // deleted file system items in a separate event: no need to
185                    // try to propagate this event.
186                    continue;
187                }
188                // Let's try to adapt the document as a FileSystemItem to
189                // provide it to the FileSystemItemChange entry.
190                change = getFileSystemItemChange(session, docRef, entry, null);
191                if (change == null) {
192                    // Non-adaptable documents are ignored
193                    log.debug(
194                            "Document {} ({}) is not adaptable as a FileSystemItem, not adding any entry to the change summary.",
195                            entry::getDocPath, () -> docRef);
196                } else {
197                    log.debug("Adding FileSystemItemChange entry to the change summary: {}", change);
198                    changes.add(change);
199                }
200            }
201        }
202        return changes;
203    }
204
205    /**
206     * Returns the last available log id in the audit log table (primary key) to be used as the upper bound of the event
207     * log id range clause in the change query.
208     */
209    @Override
210    @SuppressWarnings("unchecked")
211    public long getUpperBound() {
212        AuditReader auditService = Framework.getService(AuditReader.class);
213        String auditQuery = "from LogEntry log order by log.id desc";
214        log.debug("Querying audit log for greatest id: {}", auditQuery);
215
216        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1);
217        if (entries.isEmpty()) {
218            log.debug("Found no audit log entries, returning -1");
219            return -1;
220        }
221        return entries.get(0).getId();
222    }
223
224    /**
225     * Returns the last available log id in the audit log table (primary key) considering events older than the last
226     * clustering invalidation date if clustering is enabled for at least one of the given repositories. This is to make
227     * sure the {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
228     */
229    @Override
230    @SuppressWarnings("unchecked")
231    public long getUpperBound(Set<String> repositoryNames) {
232        long clusteringDelay = getClusteringDelay(repositoryNames);
233        AuditReader auditService = Framework.getService(AuditReader.class);
234        Map<String, Object> params = new HashMap<>();
235        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log");
236        if (clusteringDelay > -1) {
237            // Double the delay in case of overlapping, see https://jira.nuxeo.com/browse/NXP-14826
238            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
239            params.put("lastClusteringInvalidationDate", new Date(lastClusteringInvalidationDate));
240            auditQuerySb.append(" where log.logDate < :lastClusteringInvalidationDate");
241        }
242        auditQuerySb.append(" order by log.id desc");
243        String auditQuery = auditQuerySb.toString();
244        log.debug("Querying audit log for greatest id: {} with params: {}", auditQuery, params);
245
246        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, 1);
247        if (entries.isEmpty()) {
248            if (clusteringDelay > -1) {
249                // Check for existing entries without the clustering invalidation date filter to not return -1 in this
250                // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >=
251                // 0
252                List<LogEntry> allEntries = (List<LogEntry>) auditService.nativeQuery("from LogEntry", 1, 1);
253                if (!allEntries.isEmpty()) {
254                    log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
255                    return 0;
256                }
257            }
258            log.debug("Found no audit log entries, returning -1");
259            return -1;
260        }
261        return entries.get(0).getId();
262    }
263
264    /**
265     * Returns the longest clustering delay among the given repositories for which clustering is enabled.
266     */
267    protected long getClusteringDelay(Set<String> repositoryNames) {
268        long clusteringDelay = -1;
269        SQLRepositoryService repositoryService = Framework.getService(SQLRepositoryService.class);
270        for (String repositoryName : repositoryNames) {
271            RepositoryDescriptor repositoryDescriptor = repositoryService.getRepositoryDescriptor(repositoryName);
272            if (repositoryDescriptor == null) {
273                // Not a VCS repository`
274                continue;
275            }
276            if (repositoryDescriptor.getClusteringEnabled()) {
277                clusteringDelay = Math.max(clusteringDelay, repositoryDescriptor.getClusteringDelay());
278            }
279        }
280        return clusteringDelay;
281    }
282
283    @SuppressWarnings("unchecked")
284    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
285            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) {
286        AuditReader auditService = Framework.getService(AuditReader.class);
287        // Set fixed query parameters
288        Map<String, Object> params = new HashMap<>();
289        params.put("repositoryId", session.getRepositoryName());
290
291        // Build query and set dynamic parameters
292        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where ");
293        auditQuerySb.append("log.repositoryId = :repositoryId");
294        auditQuerySb.append(" and ");
295        auditQuerySb.append("(");
296        if (!activeRoots.getPaths().isEmpty()) {
297            // detect changes under the currently active roots for the
298            // current user
299            auditQuerySb.append("(");
300            auditQuerySb.append("log.category = 'eventDocumentCategory'");
301            // TODO: don't hardcode event ids (contribute them?)
302            auditQuerySb.append(
303                    " and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked' or log.eventId = 'documentUntrashed')");
304            auditQuerySb.append(" or ");
305            auditQuerySb.append("log.category = 'eventLifeCycleCategory'");
306            auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ");
307            auditQuerySb.append(") and (");
308            auditQuerySb.append("(");
309            auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params));
310            auditQuerySb.append(")");
311            if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) {
312                auditQuerySb.append(" or (");
313                auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params));
314                auditQuerySb.append(")");
315            }
316            auditQuerySb.append(") or ");
317        }
318        // Detect any root (un-)registration changes for the roots previously
319        // seen by the current user.
320        // Exclude 'rootUnregistered' since root unregistration is covered by a
321        // "deleted" virtual event.
322        auditQuerySb.append("(");
323        auditQuerySb.append("log.category = '");
324        auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY);
325        auditQuerySb.append("' and log.eventId != 'rootUnregistered'");
326        auditQuerySb.append(")");
327        auditQuerySb.append(") and (");
328        auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, params));
329        // we intentionally sort by eventDate even if the range filtering is
330        // done on the log id: eventDate is useful to reflect the ordering of
331        // events occurring inside the same transaction while the
332        // monotonic behavior of log id is useful for ensuring that consecutive
333        // range queries to the audit won't miss any events even when long
334        // running transactions are logged after a delay.
335        auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc");
336        String auditQuery = auditQuerySb.toString();
337
338        log.debug("Querying audit log for changes: {} with params: {}", auditQuery, params);
339
340        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit);
341
342        // Post filter the output to remove (un)registration that are unrelated
343        // to the current user.
344        List<LogEntry> postFilteredEntries = new ArrayList<>();
345        String principalName = session.getPrincipal().getName();
346        for (LogEntry entry : entries) {
347            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
348            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
349                // ignore event that only impact other users
350                continue;
351            }
352            log.debug("Change detected: {}", entry);
353            postFilteredEntries.add(entry);
354        }
355        return postFilteredEntries;
356    }
357
358    protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) {
359        StringBuilder rootPathClause = new StringBuilder();
360        int rootPathCount = 0;
361        for (String rootPath : rootPaths) {
362            rootPathCount++;
363            String rootPathParam = "rootPath" + rootPathCount;
364            if (rootPathClause.length() > 0) {
365                rootPathClause.append(" or ");
366            }
367            rootPathClause.append(String.format("log.docPath like :%s", rootPathParam));
368            params.put(rootPathParam, rootPath + '%');
369
370        }
371        return rootPathClause.toString();
372    }
373
374    protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds,
375            Map<String, Object> params) {
376        String paramName = "collectionMemberIds";
377        params.put(paramName, collectionSyncRootMemberIds);
378        return String.format("log.docUUID in (:%s)", paramName);
379    }
380
381    /**
382     * Using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826.
383     */
384    protected String getJPARangeClause(long lowerBound, long upperBound, Map<String, Object> params) {
385        params.put("lowerBound", lowerBound);
386        params.put("upperBound", upperBound);
387        return "log.id > :lowerBound and log.id <= :upperBound";
388    }
389
390    protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry,
391            String expectedFileSystemItemId) {
392        DocumentModel doc = session.getDocument(docRef);
393        // TODO: check the facet, last root change and list of roots
394        // to have a special handling for the roots.
395        FileSystemItem fsItem = null;
396        try {
397            // NXP-19442: Avoid useless and costly call to DocumentModel#getLockInfo
398            fsItem = Framework.getService(FileSystemItemAdapterService.class).getFileSystemItem(doc, false, false,
399                    false);
400        } catch (RootlessItemException e) {
401            // Can happen for an unregistered synchronization root that cannot
402            // be adapted as a FileSystemItem: nothing to do.
403            log.debug("RootlessItemException thrown while trying to adapt document {} ({}) as a FileSystemItem.",
404                    entry::getDocPath, () -> docRef);
405        }
406        if (fsItem == null) {
407            log.debug("Document {} ({}) is not adaptable as a FileSystemItem, returning null.", entry::getDocPath,
408                    () -> docRef);
409            return null;
410        }
411        if (expectedFileSystemItemId != null
412                && !fsItem.getId()
413                          .endsWith(AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) {
414            log.debug(
415                    "Id {} of FileSystemItem adapted from document {} ({}) doesn't match expected FileSystemItem id {}, returning null.",
416                    fsItem::getId, entry::getDocPath, () -> docRef, () -> expectedFileSystemItemId);
417            return null;
418        }
419        log.debug("Document {} ({}) is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.",
420                entry::getDocPath, () -> docRef);
421        // EventDate is able to reflect the ordering of the events
422        // inside a transaction (e.g. when several documents are
423        // created, updated, deleted at once) hence it's useful
424        // to pass that info to the client even though the change
425        // detection filtering is using the log id to have a
426        // guaranteed monotonic behavior that evenDate cannot
427        // guarantee when facing long transactions.
428        return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(), entry.getRepositoryId(),
429                entry.getDocUUID(), fsItem);
430    }
431
432}