001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.drive.service.impl;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.drive.adapter.FileSystemItem;
031import org.nuxeo.drive.adapter.RootlessItemException;
032import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem;
033import org.nuxeo.drive.service.FileSystemChangeFinder;
034import org.nuxeo.drive.service.FileSystemItemAdapterService;
035import org.nuxeo.drive.service.FileSystemItemChange;
036import org.nuxeo.drive.service.NuxeoDriveEvents;
037import org.nuxeo.drive.service.NuxeoDriveManager;
038import org.nuxeo.drive.service.SynchronizationRoots;
039import org.nuxeo.drive.service.TooManyChangesException;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.DocumentRef;
043import org.nuxeo.ecm.core.api.IdRef;
044import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
045import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService;
046import org.nuxeo.ecm.platform.audit.api.AuditReader;
047import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
048import org.nuxeo.ecm.platform.audit.api.LogEntry;
049import org.nuxeo.runtime.api.Framework;
050
051/**
052 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}.
053 *
054 * @author Antoine Taillefer
055 */
056public class AuditChangeFinder implements FileSystemChangeFinder {
057
058    private static final long serialVersionUID = 1963018967324857522L;
059
060    private static final Log log = LogFactory.getLog(AuditChangeFinder.class);
061
062    protected Map<String, String> parameters = new HashMap<String, String>();
063
064    @Override
065    public void handleParameters(Map<String, String> parameters) {
066        this.parameters.putAll(parameters);
067    }
068
069    /**
070     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping old method based on log date
071     * for backward compatibility.
072     * <p>
073     * Now using event log id for lower and upper bounds to ensure consistency.
074     *
075     * @see https://jira.nuxeo.com/browse/NXP-14826
076     * @see #getFileSystemChangesIntegerBounds(CoreSession, Set, SynchronizationRoots, Set, long, long, int)
077     */
078    @Override
079    public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
080            SynchronizationRoots activeRoots, long lastSuccessfulSyncDate, long syncDate, int limit)
081            throws TooManyChangesException {
082        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, null, lastSuccessfulSyncDate, syncDate,
083                false, limit);
084    }
085
086    @Override
087    public List<FileSystemItemChange> getFileSystemChangesIntegerBounds(CoreSession session,
088            Set<IdRef> lastActiveRootRefs, SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds,
089            long lowerBound, long upperBound, int limit) throws TooManyChangesException {
090        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, collectionSyncRootMemberIds, lowerBound,
091                upperBound, true, limit);
092    }
093
094    protected List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
095            SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound,
096            long upperBound, boolean integerBounds, int limit) throws TooManyChangesException {
097        String principalName = session.getPrincipal().getName();
098        List<FileSystemItemChange> changes = new ArrayList<FileSystemItemChange>();
099
100        // Note: lastActiveRootRefs is not used: we could remove it from the
101        // public API
102        // and from the client as well but it might be useful to optimize future
103        // alternative implementations FileSystemChangeFinder component so it
104        // might
105        // be better to leave it part of the public API as currently.
106
107        // Find changes from the log under active roots or events that are
108        // linked to the un-registration or deletion of formerly synchronized
109        // roots
110        List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
111                upperBound, integerBounds, limit);
112
113        // First pass over the entries to check if a "NuxeoDrive" event has
114        // occurred during that period.
115        // This event can be:
116        // - a root registration
117        // - a root unregistration
118        // - a "deleted" transition
119        // - an "undeleted" transition
120        // - a removal
121        // - a move to an non synchronization root
122        // - a security update
123        // Thus the list of active roots may have changed and the cache might
124        // need to be invalidated: let's make sure we perform a
125        // query with the actual active roots.
126        for (LogEntry entry : entries) {
127            if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) {
128                if (log.isDebugEnabled()) {
129                    log.debug(String.format("Detected sync root change for user '%s' in audit log:"
130                            + " invalidating the root cache and refetching the changes.", principalName));
131                }
132                NuxeoDriveManager driveManager = Framework.getLocalService(NuxeoDriveManager.class);
133                driveManager.invalidateSynchronizationRootsCache(principalName);
134                driveManager.invalidateCollectionSyncRootMemberCache(principalName);
135                Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots(session.getPrincipal());
136                SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName());
137                Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds(
138                        session.getPrincipal()).get(session.getRepositoryName());
139                entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds,
140                        lowerBound, upperBound, integerBounds, limit);
141                break;
142            }
143        }
144
145        if (entries.size() >= limit) {
146            throw new TooManyChangesException("Too many changes found in the audit logs.");
147        }
148        for (LogEntry entry : entries) {
149            FileSystemItemChange change = null;
150            DocumentRef docRef = new IdRef(entry.getDocUUID());
151            ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId");
152            if (fsIdInfo != null) {
153                // This document has been deleted, moved, is an unregistered synchronization root or its security has
154                // been updated, we just know the FileSystemItem id and name.
155                if (log.isDebugEnabled()) {
156                    log.debug(String.format(
157                            "Found extended info in audit log entry, document %s has been deleted or is an unregistered synchronization root.",
158                            docRef));
159                }
160                boolean isChangeSet = false;
161                // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry,
162                // only in the case of a move or a security update.
163                // This can succeed if this is a move to a synchronization root or a security update after which the
164                // current user still has access to the document.
165                if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) {
166                    change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class));
167                    if (change != null) {
168                        if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
169                            // A move to a synchronization root also fires a documentMoved event, don't propagate the
170                            // virtual event.
171                            if (log.isDebugEnabled()) {
172                                log.debug(String.format(
173                                        "Document %s has been moved to another synchronzation root, not adding entry to the change summary.",
174                                        docRef));
175                            }
176                            continue;
177                        }
178                        isChangeSet = true;
179                    }
180                }
181                if (!isChangeSet) {
182                    // If the document has been deleted, is a regular unregistered synchronization root, has been moved
183                    // to a non synchronization root, if its security has been updated denying access to the current
184                    // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the
185                    // FileSystemItem id and name to the FileSystemItemChange entry.
186                    if (log.isDebugEnabled()) {
187                        log.debug(String.format(
188                                "Document %s doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.",
189                                docRef));
190                    }
191                    String fsId = fsIdInfo.getValue(String.class);
192                    String eventId;
193                    if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
194                        // Move to a non synchronization root
195                        eventId = NuxeoDriveEvents.DELETED_EVENT;
196                    } else {
197                        // Deletion, unregistration or security update
198                        eventId = entry.getEventId();
199                    }
200                    change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(),
201                            entry.getRepositoryId(), entry.getDocUUID(), fsId, null);
202                }
203                if (log.isDebugEnabled()) {
204                    log.debug(String.format("Adding FileSystemItemChange entry for document %s to the change summary.",
205                            docRef));
206                }
207                changes.add(change);
208            } else {
209                // No extended info in the audit log entry, this should not be a
210                // deleted document, nor an unregistered synchronization root
211                // nor a security update denying access to the current user.
212                if (log.isDebugEnabled()) {
213                    log.debug(String.format(
214                            "No extended info found in audit log entry, document %s has not been deleted nor is an unregistered synchronization root.",
215                            docRef));
216                }
217                if (!session.exists(docRef)) {
218                    if (log.isDebugEnabled()) {
219                        log.debug(String.format("Document %s doesn't exist, not adding entry to the change summary.",
220                                docRef));
221                    }
222                    // Deleted or non accessible documents are mapped to
223                    // deleted file system items in a separate event: no need to
224                    // try to propagate this event.
225                    continue;
226                }
227                // Let's try to adapt the document as a FileSystemItem to
228                // provide it to the FileSystemItemChange entry.
229                change = getFileSystemItemChange(session, docRef, entry, null);
230                if (change == null) {
231                    // Non-adaptable documents are ignored
232                    if (log.isDebugEnabled()) {
233                        log.debug(String.format(
234                                "Document %s is not adaptable as a FileSystemItem, not adding any entry to the change summary.",
235                                docRef));
236                    }
237                } else {
238                    if (log.isDebugEnabled()) {
239                        log.debug(String.format(
240                                "Adding FileSystemItemChange entry for document %s to the change summary.", docRef));
241                    }
242                    changes.add(change);
243                }
244            }
245        }
246        return changes;
247    }
248
249    /**
250     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping for backward compatibility.
251     * <p>
252     * Return the current time to query the logDate field of the audit log. This time intentionally truncated to 0
253     * milliseconds to have a consistent behavior across databases.
254     * <p>
255     * Should now use last available log id in the audit log table as upper bound.
256     *
257     * @see https://jira.nuxeo.com/browse/NXP-14826
258     * @see #getUpperBound()
259     */
260    @Override
261    public long getCurrentDate() {
262        long now = System.currentTimeMillis();
263        return now - (now % 1000);
264    }
265
266    /**
267     * Return the last available log id in the audit log table (primary key) to be used as the upper bound of the event
268     * log id range clause in the change query.
269     */
270    @Override
271    @SuppressWarnings("unchecked")
272    public long getUpperBound() {
273        AuditReader auditService = Framework.getService(AuditReader.class);
274        String auditQuery = "from LogEntry log order by log.id desc";
275        if (log.isDebugEnabled()) {
276            log.debug("Querying audit log for greatest id: " + auditQuery);
277        }
278        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1);
279        if (entries.isEmpty()) {
280            if (log.isDebugEnabled()) {
281                log.debug("Found no audit log entries, returning -1");
282            }
283            return -1;
284        }
285        return entries.get(0).getId();
286    }
287
288    /**
289     * Returns the last available log id in the audit log table (primary key) considering events older than the last
290     * clustering invalidation date if clustering is enabled for at least one of the given repositories. This is to make
291     * sure the {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
292     */
293    @Override
294    @SuppressWarnings("unchecked")
295    public long getUpperBound(Set<String> repositoryNames) {
296        long clusteringDelay = getClusteringDelay(repositoryNames);
297        AuditReader auditService = Framework.getService(AuditReader.class);
298        Map<String, Object> params = new HashMap<String, Object>();
299        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log");
300        if (clusteringDelay > -1) {
301            // Double the delay in case of overlapping, see https://jira.nuxeo.com/browse/NXP-14826
302            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
303            params.put("lastClusteringInvalidationDate", new Date(lastClusteringInvalidationDate));
304            auditQuerySb.append(" where log.logDate < :lastClusteringInvalidationDate");
305        }
306        auditQuerySb.append(" order by log.id desc");
307        String auditQuery = auditQuerySb.toString();
308        if (log.isDebugEnabled()) {
309            log.debug("Querying audit log for greatest id: " + auditQuery + " with params: " + params);
310        }
311        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, 1);
312        if (entries.isEmpty()) {
313            if (clusteringDelay > -1) {
314                // Check for existing entries without the clustering invalidation date filter to not return -1 in this
315                // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >=
316                // 0
317                List<LogEntry> allEntries = (List<LogEntry>) auditService.nativeQuery("from LogEntry", 1, 1);
318                if (!allEntries.isEmpty()) {
319                    log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
320                    return 0;
321                }
322            }
323            log.debug("Found no audit log entries, returning -1");
324            return -1;
325        }
326        return entries.get(0).getId();
327    }
328
329    /**
330     * Returns the longest clustering delay among the given repositories for which clustering is enabled.
331     */
332    protected long getClusteringDelay(Set<String> repositoryNames) {
333        long clusteringDelay = -1;
334        SQLRepositoryService repositoryService = Framework.getService(SQLRepositoryService.class);
335        for (String repositoryName : repositoryNames) {
336            RepositoryDescriptor repositoryDescriptor = repositoryService.getRepositoryDescriptor(repositoryName);
337            if (repositoryDescriptor == null) {
338                // Not a VCS repository`
339                continue;
340            }
341            if (repositoryDescriptor.getClusteringEnabled()) {
342                clusteringDelay = Math.max(clusteringDelay, repositoryDescriptor.getClusteringDelay());
343            }
344        }
345        return clusteringDelay;
346    }
347
348    @SuppressWarnings("unchecked")
349    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
350            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
351        AuditReader auditService = Framework.getLocalService(AuditReader.class);
352        // Set fixed query parameters
353        Map<String, Object> params = new HashMap<String, Object>();
354        params.put("repositoryId", session.getRepositoryName());
355
356        // Build query and set dynamic parameters
357        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where ");
358        auditQuerySb.append("log.repositoryId = :repositoryId");
359        auditQuerySb.append(" and ");
360        auditQuerySb.append("(");
361        if (!activeRoots.getPaths().isEmpty()) {
362            // detect changes under the currently active roots for the
363            // current user
364            auditQuerySb.append("(");
365            auditQuerySb.append("log.category = 'eventDocumentCategory'");
366            // TODO: don't hardcode event ids (contribute them?)
367            auditQuerySb.append(" and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked')");
368            auditQuerySb.append(" or ");
369            auditQuerySb.append("log.category = 'eventLifeCycleCategory'");
370            auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ");
371            auditQuerySb.append(") and (");
372            auditQuerySb.append("(");
373            auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params));
374            auditQuerySb.append(")");
375            if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) {
376                auditQuerySb.append(" or (");
377                auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params));
378                auditQuerySb.append(")");
379            }
380            auditQuerySb.append(") or ");
381        }
382        // Detect any root (un-)registration changes for the roots previously
383        // seen by the current user.
384        // Exclude 'rootUnregistered' since root unregistration is covered by a
385        // "deleted" virtual event.
386        auditQuerySb.append("(");
387        auditQuerySb.append("log.category = '");
388        auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY);
389        auditQuerySb.append("' and log.eventId != 'rootUnregistered'");
390        auditQuerySb.append(")");
391        auditQuerySb.append(") and (");
392        auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, integerBounds, params));
393        // we intentionally sort by eventDate even if the range filtering is
394        // done on the log id: eventDate is useful to reflect the ordering of
395        // events occurring inside the same transaction while the
396        // monotonic behavior of log id is useful for ensuring that consecutive
397        // range queries to the audit won't miss any events even when long
398        // running transactions are logged after a delay.
399        auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc");
400        String auditQuery = auditQuerySb.toString();
401
402        if (log.isDebugEnabled()) {
403            if (log.isDebugEnabled()) {
404                log.debug("Querying audit log for changes: " + auditQuery + " with params: " + params);
405            }
406        }
407        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit);
408
409        // Post filter the output to remove (un)registration that are unrelated
410        // to the current user.
411        List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>();
412        String principalName = session.getPrincipal().getName();
413        for (LogEntry entry : entries) {
414            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
415            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
416                // ignore event that only impact other users
417                continue;
418            }
419            if (log.isDebugEnabled()) {
420                if (log.isDebugEnabled()) {
421                    log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s",
422                            entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(),
423                            entry.getDocPath()));
424                }
425            }
426            postFilteredEntries.add(entry);
427        }
428        return postFilteredEntries;
429    }
430
431    protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) {
432        StringBuilder rootPathClause = new StringBuilder();
433        int rootPathCount = 0;
434        for (String rootPath : rootPaths) {
435            rootPathCount++;
436            String rootPathParam = "rootPath" + rootPathCount;
437            if (rootPathClause.length() > 0) {
438                rootPathClause.append(" or ");
439            }
440            rootPathClause.append(String.format("log.docPath like :%s", rootPathParam));
441            params.put(rootPathParam, rootPath + '%');
442
443        }
444        return rootPathClause.toString();
445    }
446
447    protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds,
448            Map<String, Object> params) {
449        String paramName = "collectionMemberIds";
450        params.put(paramName, collectionSyncRootMemberIds);
451        return String.format("log.docUUID in (:%s)", paramName);
452    }
453
454    /**
455     * Now using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826.
456     * <p>
457     * Keeping ability to use old clause based on log date for backward compatibility, to be deprecated.
458     */
459    protected String getJPARangeClause(long lowerBound, long upperBound, boolean integerBounds,
460            Map<String, Object> params) {
461        if (integerBounds) {
462            params.put("lowerBound", lowerBound);
463            params.put("upperBound", upperBound);
464            return "log.id > :lowerBound and log.id <= :upperBound";
465        } else {
466            params.put("lastSuccessfulSyncDate", new Date(lowerBound));
467            params.put("syncDate", new Date(upperBound));
468            return "log.logDate >= :lastSuccessfulSyncDate and log.logDate < :syncDate";
469        }
470    }
471
472    protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry,
473            String expectedFileSystemItemId) {
474        DocumentModel doc = session.getDocument(docRef);
475        // TODO: check the facet, last root change and list of roots
476        // to have a special handling for the roots.
477        FileSystemItem fsItem = null;
478        try {
479            // NXP-19442: Avoid useless and costly call to DocumentModel#getLockInfo
480            fsItem = Framework.getLocalService(FileSystemItemAdapterService.class).getFileSystemItem(doc, false, false,
481                    false);
482        } catch (RootlessItemException e) {
483            // Can happen for an unregistered synchronization root that cannot
484            // be adapted as a FileSystemItem: nothing to do.
485            if (log.isDebugEnabled()) {
486                log.debug(String.format(
487                        "RootlessItemException thrown while trying to adapt document %s as a FileSystemItem.", docRef));
488            }
489        }
490        if (fsItem == null) {
491            if (log.isDebugEnabled()) {
492                log.debug(String.format("Document %s is not adaptable as a FileSystemItem, returning null.", docRef));
493            }
494            return null;
495        }
496        if (expectedFileSystemItemId != null
497                && !fsItem.getId().endsWith(
498                        AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) {
499            if (log.isDebugEnabled()) {
500                log.debug(String.format(
501                        "Id %s of FileSystemItem adapted from document %s doesn't match expected FileSystemItem id %s, returning null.",
502                        fsItem.getId(), docRef, expectedFileSystemItemId));
503            }
504            return null;
505        }
506        if (log.isDebugEnabled()) {
507            log.debug(String.format(
508                    "Document %s is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.",
509                    docRef));
510        }
511        // EventDate is able to reflect the ordering of the events
512        // inside a transaction (e.g. when several documents are
513        // created, updated, deleted at once) hence it's useful
514        // to pass that info to the client even though the change
515        // detection filtering is using the log id to have a
516        // guaranteed monotonic behavior that evenDate cannot
517        // guarantee when facing long transactions.
518        return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(),
519                entry.getRepositoryId(), entry.getDocUUID(), fsItem);
520    }
521
522}