001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.drive.service.impl;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.drive.adapter.FileSystemItem;
031import org.nuxeo.drive.adapter.RootlessItemException;
032import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem;
033import org.nuxeo.drive.service.FileSystemChangeFinder;
034import org.nuxeo.drive.service.FileSystemItemAdapterService;
035import org.nuxeo.drive.service.FileSystemItemChange;
036import org.nuxeo.drive.service.NuxeoDriveEvents;
037import org.nuxeo.drive.service.NuxeoDriveManager;
038import org.nuxeo.drive.service.SynchronizationRoots;
039import org.nuxeo.drive.service.TooManyChangesException;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.DocumentRef;
043import org.nuxeo.ecm.core.api.IdRef;
044import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
045import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService;
046import org.nuxeo.ecm.platform.audit.api.AuditReader;
047import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
048import org.nuxeo.ecm.platform.audit.api.LogEntry;
049import org.nuxeo.runtime.api.Framework;
050
051/**
052 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}.
053 *
054 * @author Antoine Taillefer
055 */
056public class AuditChangeFinder implements FileSystemChangeFinder {
057
058    private static final long serialVersionUID = 1963018967324857522L;
059
060    private static final Log log = LogFactory.getLog(AuditChangeFinder.class);
061
062    protected Map<String, String> parameters = new HashMap<String, String>();
063
064    @Override
065    public void handleParameters(Map<String, String> parameters) {
066        this.parameters.putAll(parameters);
067    }
068
069    /**
070     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping old method based on log date
071     * for backward compatibility.
072     * <p>
073     * Now using event log id for lower and upper bounds to ensure consistency.
074     *
075     * @see https://jira.nuxeo.com/browse/NXP-14826
076     * @see #getFileSystemChangesIntegerBounds(CoreSession, Set, SynchronizationRoots, Set, long, long, int)
077     */
078    @Override
079    public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
080            SynchronizationRoots activeRoots, long lastSuccessfulSyncDate, long syncDate, int limit)
081            throws TooManyChangesException {
082        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, null, lastSuccessfulSyncDate, syncDate,
083                false, limit);
084    }
085
086    @Override
087    public List<FileSystemItemChange> getFileSystemChangesIntegerBounds(CoreSession session,
088            Set<IdRef> lastActiveRootRefs, SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds,
089            long lowerBound, long upperBound, int limit) throws TooManyChangesException {
090        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, collectionSyncRootMemberIds, lowerBound,
091                upperBound, true, limit);
092    }
093
094    protected List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
095            SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound,
096            boolean integerBounds, int limit) throws TooManyChangesException {
097        String principalName = session.getPrincipal().getName();
098        List<FileSystemItemChange> changes = new ArrayList<FileSystemItemChange>();
099
100        // Note: lastActiveRootRefs is not used: we could remove it from the
101        // public API
102        // and from the client as well but it might be useful to optimize future
103        // alternative implementations FileSystemChangeFinder component so it
104        // might
105        // be better to leave it part of the public API as currently.
106
107        // Find changes from the log under active roots or events that are
108        // linked to the un-registration or deletion of formerly synchronized
109        // roots
110        List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
111                upperBound, integerBounds, limit);
112
113        // First pass over the entries to check if a "NuxeoDrive" event has
114        // occurred during that period.
115        // This event can be:
116        // - a root registration
117        // - a root unregistration
118        // - a "deleted" transition
119        // - an "undeleted" transition
120        // - a removal
121        // - a move to an non synchronization root
122        // - a security update
123        // Thus the list of active roots may have changed and the cache might
124        // need to be invalidated: let's make sure we perform a
125        // query with the actual active roots.
126        for (LogEntry entry : entries) {
127            if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) {
128                if (log.isDebugEnabled()) {
129                    log.debug(String.format("Detected sync root change for user '%s' in audit log:"
130                            + " invalidating the root cache and refetching the changes.", principalName));
131                }
132                NuxeoDriveManager driveManager = Framework.getLocalService(NuxeoDriveManager.class);
133                driveManager.invalidateSynchronizationRootsCache(principalName);
134                driveManager.invalidateCollectionSyncRootMemberCache(principalName);
135                Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots(
136                        session.getPrincipal());
137                SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName());
138                Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds(
139                        session.getPrincipal()).get(session.getRepositoryName());
140                entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds, lowerBound,
141                        upperBound, integerBounds, limit);
142                break;
143            }
144        }
145
146        if (entries.size() >= limit) {
147            throw new TooManyChangesException("Too many changes found in the audit logs.");
148        }
149        for (LogEntry entry : entries) {
150            if (log.isDebugEnabled()) {
151                log.debug(String.format("Handling log entry %s", entry));
152            }
153            FileSystemItemChange change = null;
154            DocumentRef docRef = new IdRef(entry.getDocUUID());
155            ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId");
156            if (fsIdInfo != null) {
157                // This document has been deleted, moved, is an unregistered synchronization root or its security has
158                // been updated, we just know the FileSystemItem id and name.
159                if (log.isDebugEnabled()) {
160                    log.debug(String.format("Found extended info in audit log entry: document has been deleted, moved,"
161                            + " is an unregistered synchronization root or its security has been updated,"
162                            + " we just know the FileSystemItem id and name."));
163                }
164                boolean isChangeSet = false;
165                // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry,
166                // only in the case of a move or a security update.
167                // This can succeed if this is a move to a synchronization root or a security update after which the
168                // current user still has access to the document.
169                if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) {
170                    change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class));
171                    if (change != null) {
172                        if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
173                            // A move to a synchronization root also fires a documentMoved event, don't propagate the
174                            // virtual event.
175                            if (log.isDebugEnabled()) {
176                                log.debug(String.format(
177                                        "Document %s (%s) has been moved to another synchronzation root, not adding entry to the change summary.",
178                                        entry.getDocPath(), docRef));
179                            }
180                            continue;
181                        }
182                        isChangeSet = true;
183                    }
184                }
185                if (!isChangeSet) {
186                    // If the document has been deleted, is a regular unregistered synchronization root, has been moved
187                    // to a non synchronization root, if its security has been updated denying access to the current
188                    // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the
189                    // FileSystemItem id and name to the FileSystemItemChange entry.
190                    if (log.isDebugEnabled()) {
191                        log.debug(String.format(
192                                "Document %s (%s) doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.",
193                                entry.getDocPath(), docRef));
194                    }
195                    String fsId = fsIdInfo.getValue(String.class);
196                    String eventId;
197                    if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
198                        // Move to a non synchronization root
199                        eventId = NuxeoDriveEvents.DELETED_EVENT;
200                    } else {
201                        // Deletion, unregistration or security update
202                        eventId = entry.getEventId();
203                    }
204                    change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(),
205                            entry.getRepositoryId(), entry.getDocUUID(), fsId, null);
206                }
207                if (log.isDebugEnabled()) {
208                    log.debug(String.format("Adding FileSystemItemChange entry to the change summary: %s", change));
209                }
210                changes.add(change);
211            } else {
212                // No extended info in the audit log entry, this should not be a deleted document, a moved document, an
213                // unregistered synchronization root nor a security update denying access to the current user.
214                if (log.isDebugEnabled()) {
215                    log.debug(String.format(
216                            "No extended info found in audit log entry: this is not a deleted document, a moved document,"
217                                    + " an unregistered synchronization root nor a security update denying access to the current user.",
218                            docRef));
219                }
220                if (!session.exists(docRef)) {
221                    if (log.isDebugEnabled()) {
222                        log.debug(
223                                String.format("Document %s (%s) doesn't exist, not adding entry to the change summary.",
224                                        entry.getDocPath(), docRef));
225                    }
226                    // Deleted or non accessible documents are mapped to
227                    // deleted file system items in a separate event: no need to
228                    // try to propagate this event.
229                    continue;
230                }
231                // Let's try to adapt the document as a FileSystemItem to
232                // provide it to the FileSystemItemChange entry.
233                change = getFileSystemItemChange(session, docRef, entry, null);
234                if (change == null) {
235                    // Non-adaptable documents are ignored
236                    if (log.isDebugEnabled()) {
237                        log.debug(String.format(
238                                "Document %s (%s) is not adaptable as a FileSystemItem, not adding any entry to the change summary.",
239                                entry.getDocPath(), docRef));
240                    }
241                } else {
242                    if (log.isDebugEnabled()) {
243                        log.debug(String.format("Adding FileSystemItemChange entry to the change summary: %s", change));
244                    }
245                    changes.add(change);
246                }
247            }
248        }
249        return changes;
250    }
251
252    /**
253     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping for backward compatibility.
254     * <p>
255     * Return the current time to query the logDate field of the audit log. This time intentionally truncated to 0
256     * milliseconds to have a consistent behavior across databases.
257     * <p>
258     * Should now use last available log id in the audit log table as upper bound.
259     *
260     * @see https://jira.nuxeo.com/browse/NXP-14826
261     * @see #getUpperBound()
262     */
263    @Override
264    public long getCurrentDate() {
265        long now = System.currentTimeMillis();
266        return now - (now % 1000);
267    }
268
269    /**
270     * Return the last available log id in the audit log table (primary key) to be used as the upper bound of the event
271     * log id range clause in the change query.
272     */
273    @Override
274    @SuppressWarnings("unchecked")
275    public long getUpperBound() {
276        AuditReader auditService = Framework.getService(AuditReader.class);
277        String auditQuery = "from LogEntry log order by log.id desc";
278        if (log.isDebugEnabled()) {
279            log.debug("Querying audit log for greatest id: " + auditQuery);
280        }
281        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1);
282        if (entries.isEmpty()) {
283            if (log.isDebugEnabled()) {
284                log.debug("Found no audit log entries, returning -1");
285            }
286            return -1;
287        }
288        return entries.get(0).getId();
289    }
290
291    /**
292     * Returns the last available log id in the audit log table (primary key) considering events older than the last
293     * clustering invalidation date if clustering is enabled for at least one of the given repositories. This is to make
294     * sure the {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
295     */
296    @Override
297    @SuppressWarnings("unchecked")
298    public long getUpperBound(Set<String> repositoryNames) {
299        long clusteringDelay = getClusteringDelay(repositoryNames);
300        AuditReader auditService = Framework.getService(AuditReader.class);
301        Map<String, Object> params = new HashMap<String, Object>();
302        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log");
303        if (clusteringDelay > -1) {
304            // Double the delay in case of overlapping, see https://jira.nuxeo.com/browse/NXP-14826
305            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
306            params.put("lastClusteringInvalidationDate", new Date(lastClusteringInvalidationDate));
307            auditQuerySb.append(" where log.logDate < :lastClusteringInvalidationDate");
308        }
309        auditQuerySb.append(" order by log.id desc");
310        String auditQuery = auditQuerySb.toString();
311        if (log.isDebugEnabled()) {
312            log.debug("Querying audit log for greatest id: " + auditQuery + " with params: " + params);
313        }
314        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, 1);
315        if (entries.isEmpty()) {
316            if (clusteringDelay > -1) {
317                // Check for existing entries without the clustering invalidation date filter to not return -1 in this
318                // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >=
319                // 0
320                List<LogEntry> allEntries = (List<LogEntry>) auditService.nativeQuery("from LogEntry", 1, 1);
321                if (!allEntries.isEmpty()) {
322                    log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
323                    return 0;
324                }
325            }
326            log.debug("Found no audit log entries, returning -1");
327            return -1;
328        }
329        return entries.get(0).getId();
330    }
331
332    /**
333     * Returns the longest clustering delay among the given repositories for which clustering is enabled.
334     */
335    protected long getClusteringDelay(Set<String> repositoryNames) {
336        long clusteringDelay = -1;
337        SQLRepositoryService repositoryService = Framework.getService(SQLRepositoryService.class);
338        for (String repositoryName : repositoryNames) {
339            RepositoryDescriptor repositoryDescriptor = repositoryService.getRepositoryDescriptor(repositoryName);
340            if (repositoryDescriptor == null) {
341                // Not a VCS repository`
342                continue;
343            }
344            if (repositoryDescriptor.getClusteringEnabled()) {
345                clusteringDelay = Math.max(clusteringDelay, repositoryDescriptor.getClusteringDelay());
346            }
347        }
348        return clusteringDelay;
349    }
350
351    @SuppressWarnings("unchecked")
352    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
353            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds,
354            int limit) {
355        AuditReader auditService = Framework.getLocalService(AuditReader.class);
356        // Set fixed query parameters
357        Map<String, Object> params = new HashMap<String, Object>();
358        params.put("repositoryId", session.getRepositoryName());
359
360        // Build query and set dynamic parameters
361        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where ");
362        auditQuerySb.append("log.repositoryId = :repositoryId");
363        auditQuerySb.append(" and ");
364        auditQuerySb.append("(");
365        if (!activeRoots.getPaths().isEmpty()) {
366            // detect changes under the currently active roots for the
367            // current user
368            auditQuerySb.append("(");
369            auditQuerySb.append("log.category = 'eventDocumentCategory'");
370            // TODO: don't hardcode event ids (contribute them?)
371            auditQuerySb.append(
372                    " and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked')");
373            auditQuerySb.append(" or ");
374            auditQuerySb.append("log.category = 'eventLifeCycleCategory'");
375            auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ");
376            auditQuerySb.append(") and (");
377            auditQuerySb.append("(");
378            auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params));
379            auditQuerySb.append(")");
380            if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) {
381                auditQuerySb.append(" or (");
382                auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params));
383                auditQuerySb.append(")");
384            }
385            auditQuerySb.append(") or ");
386        }
387        // Detect any root (un-)registration changes for the roots previously
388        // seen by the current user.
389        // Exclude 'rootUnregistered' since root unregistration is covered by a
390        // "deleted" virtual event.
391        auditQuerySb.append("(");
392        auditQuerySb.append("log.category = '");
393        auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY);
394        auditQuerySb.append("' and log.eventId != 'rootUnregistered'");
395        auditQuerySb.append(")");
396        auditQuerySb.append(") and (");
397        auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, integerBounds, params));
398        // we intentionally sort by eventDate even if the range filtering is
399        // done on the log id: eventDate is useful to reflect the ordering of
400        // events occurring inside the same transaction while the
401        // monotonic behavior of log id is useful for ensuring that consecutive
402        // range queries to the audit won't miss any events even when long
403        // running transactions are logged after a delay.
404        auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc");
405        String auditQuery = auditQuerySb.toString();
406
407        if (log.isDebugEnabled()) {
408            log.debug("Querying audit log for changes: " + auditQuery + " with params: " + params);
409        }
410        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit);
411
412        // Post filter the output to remove (un)registration that are unrelated
413        // to the current user.
414        List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>();
415        String principalName = session.getPrincipal().getName();
416        for (LogEntry entry : entries) {
417            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
418            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
419                // ignore event that only impact other users
420                continue;
421            }
422            if (log.isDebugEnabled()) {
423                log.debug(String.format("Change detected: %s", entry));
424            }
425            postFilteredEntries.add(entry);
426        }
427        return postFilteredEntries;
428    }
429
430    protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) {
431        StringBuilder rootPathClause = new StringBuilder();
432        int rootPathCount = 0;
433        for (String rootPath : rootPaths) {
434            rootPathCount++;
435            String rootPathParam = "rootPath" + rootPathCount;
436            if (rootPathClause.length() > 0) {
437                rootPathClause.append(" or ");
438            }
439            rootPathClause.append(String.format("log.docPath like :%s", rootPathParam));
440            params.put(rootPathParam, rootPath + '%');
441
442        }
443        return rootPathClause.toString();
444    }
445
446    protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds,
447            Map<String, Object> params) {
448        String paramName = "collectionMemberIds";
449        params.put(paramName, collectionSyncRootMemberIds);
450        return String.format("log.docUUID in (:%s)", paramName);
451    }
452
453    /**
454     * Now using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826.
455     * <p>
456     * Keeping ability to use old clause based on log date for backward compatibility, to be deprecated.
457     */
458    protected String getJPARangeClause(long lowerBound, long upperBound, boolean integerBounds,
459            Map<String, Object> params) {
460        if (integerBounds) {
461            params.put("lowerBound", lowerBound);
462            params.put("upperBound", upperBound);
463            return "log.id > :lowerBound and log.id <= :upperBound";
464        } else {
465            params.put("lastSuccessfulSyncDate", new Date(lowerBound));
466            params.put("syncDate", new Date(upperBound));
467            return "log.logDate >= :lastSuccessfulSyncDate and log.logDate < :syncDate";
468        }
469    }
470
471    protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry,
472            String expectedFileSystemItemId) {
473        DocumentModel doc = session.getDocument(docRef);
474        // TODO: check the facet, last root change and list of roots
475        // to have a special handling for the roots.
476        FileSystemItem fsItem = null;
477        try {
478            // NXP-19442: Avoid useless and costly call to DocumentModel#getLockInfo
479            fsItem = Framework.getLocalService(FileSystemItemAdapterService.class).getFileSystemItem(doc, false, false,
480                    false);
481        } catch (RootlessItemException e) {
482            // Can happen for an unregistered synchronization root that cannot
483            // be adapted as a FileSystemItem: nothing to do.
484            if (log.isDebugEnabled()) {
485                log.debug(String.format(
486                        "RootlessItemException thrown while trying to adapt document %s (%s) as a FileSystemItem.",
487                        entry.getDocPath(), docRef));
488            }
489        }
490        if (fsItem == null) {
491            if (log.isDebugEnabled()) {
492                log.debug(String.format("Document %s (%s) is not adaptable as a FileSystemItem, returning null.",
493                        entry.getDocPath(), docRef));
494            }
495            return null;
496        }
497        if (expectedFileSystemItemId != null
498                && !fsItem.getId()
499                          .endsWith(AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) {
500            if (log.isDebugEnabled()) {
501                log.debug(String.format(
502                        "Id %s of FileSystemItem adapted from document %s (%s) doesn't match expected FileSystemItem id %s, returning null.",
503                        fsItem.getId(), entry.getDocPath(), docRef, expectedFileSystemItemId));
504            }
505            return null;
506        }
507        if (log.isDebugEnabled()) {
508            log.debug(String.format(
509                    "Document %s (%s) is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.",
510                    entry.getDocPath(), docRef));
511        }
512        // EventDate is able to reflect the ordering of the events
513        // inside a transaction (e.g. when several documents are
514        // created, updated, deleted at once) hence it's useful
515        // to pass that info to the client even though the change
516        // detection filtering is using the log id to have a
517        // guaranteed monotonic behavior that evenDate cannot
518        // guarantee when facing long transactions.
519        return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(), entry.getRepositoryId(),
520                entry.getDocUUID(), fsItem);
521    }
522
523}