001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.drive.service.impl;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.drive.adapter.FileSystemItem;
031import org.nuxeo.drive.adapter.RootlessItemException;
032import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem;
033import org.nuxeo.drive.service.FileSystemChangeFinder;
034import org.nuxeo.drive.service.FileSystemItemAdapterService;
035import org.nuxeo.drive.service.FileSystemItemChange;
036import org.nuxeo.drive.service.NuxeoDriveEvents;
037import org.nuxeo.drive.service.NuxeoDriveManager;
038import org.nuxeo.drive.service.SynchronizationRoots;
039import org.nuxeo.drive.service.TooManyChangesException;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.DocumentRef;
043import org.nuxeo.ecm.core.api.IdRef;
044import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
045import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService;
046import org.nuxeo.ecm.platform.audit.api.AuditReader;
047import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
048import org.nuxeo.ecm.platform.audit.api.LogEntry;
049import org.nuxeo.runtime.api.Framework;
050
051/**
052 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}.
053 *
054 * @author Antoine Taillefer
055 */
056public class AuditChangeFinder implements FileSystemChangeFinder {
057
058    private static final long serialVersionUID = 1963018967324857522L;
059
060    private static final Log log = LogFactory.getLog(AuditChangeFinder.class);
061
062    protected Map<String, String> parameters = new HashMap<String, String>();
063
064    @Override
065    public void handleParameters(Map<String, String> parameters) {
066        this.parameters.putAll(parameters);
067    }
068
069    /**
070     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping old method based on log date
071     * for backward compatibility.
072     * <p>
073     * Now using event log id for lower and upper bounds to ensure consistency.
074     *
075     * @see https://jira.nuxeo.com/browse/NXP-14826
076     * @see #getFileSystemChangesIntegerBounds(CoreSession, Set, SynchronizationRoots, Set, long, long, int)
077     */
078    @Override
079    public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
080            SynchronizationRoots activeRoots, long lastSuccessfulSyncDate, long syncDate, int limit)
081            throws TooManyChangesException {
082        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, null, lastSuccessfulSyncDate, syncDate,
083                false, limit);
084    }
085
086    @Override
087    public List<FileSystemItemChange> getFileSystemChangesIntegerBounds(CoreSession session,
088            Set<IdRef> lastActiveRootRefs, SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds,
089            long lowerBound, long upperBound, int limit) throws TooManyChangesException {
090        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, collectionSyncRootMemberIds, lowerBound,
091                upperBound, true, limit);
092    }
093
094    protected List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
095            SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound,
096            long upperBound, boolean integerBounds, int limit) throws TooManyChangesException {
097        String principalName = session.getPrincipal().getName();
098        List<FileSystemItemChange> changes = new ArrayList<FileSystemItemChange>();
099
100        // Note: lastActiveRootRefs is not used: we could remove it from the
101        // public API
102        // and from the client as well but it might be useful to optimize future
103        // alternative implementations FileSystemChangeFinder component so it
104        // might
105        // be better to leave it part of the public API as currently.
106
107        // Find changes from the log under active roots or events that are
108        // linked to the un-registration or deletion of formerly synchronized
109        // roots
110        List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
111                upperBound, integerBounds, limit);
112
113        // First pass over the entries to check if a "NuxeoDrive" event has
114        // occurred during that period.
115        // This event can be:
116        // - a root registration
117        // - a root unregistration
118        // - a "deleted" transition
119        // - an "undeleted" transition
120        // - a removal
121        // - a move to an non synchronization root
122        // - a security update
123        // Thus the list of active roots may have changed and the cache might
124        // need to be invalidated: let's make sure we perform a
125        // query with the actual active roots.
126        for (LogEntry entry : entries) {
127            if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) {
128                if (log.isDebugEnabled()) {
129                    log.debug(String.format("Detected sync root change for user '%s' in audit log:"
130                            + " invalidating the root cache and refetching the changes.", principalName));
131                }
132                NuxeoDriveManager driveManager = Framework.getLocalService(NuxeoDriveManager.class);
133                driveManager.invalidateSynchronizationRootsCache(principalName);
134                driveManager.invalidateCollectionSyncRootMemberCache(principalName);
135                Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots(session.getPrincipal());
136                SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName());
137                Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds(
138                        session.getPrincipal()).get(session.getRepositoryName());
139                entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds,
140                        lowerBound, upperBound, integerBounds, limit);
141                break;
142            }
143        }
144
145        if (entries.size() >= limit) {
146            throw new TooManyChangesException("Too many changes found in the audit logs.");
147        }
148        for (LogEntry entry : entries) {
149            FileSystemItemChange change = null;
150            DocumentRef docRef = new IdRef(entry.getDocUUID());
151            ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId");
152            if (fsIdInfo != null) {
153                // This document has been deleted, moved, is an unregistered synchronization root or its security has
154                // been updated, we just know the FileSystemItem id and name.
155                if (log.isDebugEnabled()) {
156                    log.debug(String.format(
157                            "Found extended info in audit log entry, document %s has been deleted or is an unregistered synchronization root.",
158                            docRef));
159                }
160                boolean isChangeSet = false;
161                // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry,
162                // only in the case of a move or a security update.
163                // This can succeed if this is a move to a synchronization root or a security update after which the
164                // current user still has access to the document.
165                if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) {
166                    change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class));
167                    if (change != null) {
168                        if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
169                            // A move to a synchronization root also fires a documentMoved event, don't propagate the
170                            // virtual event.
171                            if (log.isDebugEnabled()) {
172                                log.debug(String.format(
173                                        "Document %s has been moved to another synchronzation root, not adding entry to the change summary.",
174                                        docRef));
175                            }
176                            continue;
177                        }
178                        isChangeSet = true;
179                    }
180                }
181                if (!isChangeSet) {
182                    // If the document has been deleted, is a regular unregistered synchronization root, has been moved
183                    // to a non synchronization root, if its security has been updated denying access to the current
184                    // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the
185                    // FileSystemItem id and name to the FileSystemItemChange entry.
186                    if (log.isDebugEnabled()) {
187                        log.debug(String.format(
188                                "Document %s doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.",
189                                docRef));
190                    }
191                    String fsId = fsIdInfo.getValue(String.class);
192                    String fsName = entry.getExtendedInfos().get("fileSystemItemName").getValue(String.class);
193                    String eventId;
194                    if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
195                        // Move to a non synchronization root
196                        eventId = NuxeoDriveEvents.DELETED_EVENT;
197                    } else {
198                        // Deletion, unregistration or security update
199                        eventId = entry.getEventId();
200                    }
201                    change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(),
202                            entry.getRepositoryId(), entry.getDocUUID(), fsId, fsName);
203                }
204                if (log.isDebugEnabled()) {
205                    log.debug(String.format("Adding FileSystemItemChange entry for document %s to the change summary.",
206                            docRef));
207                }
208                changes.add(change);
209            } else {
210                // No extended info in the audit log entry, this should not be a
211                // deleted document, nor an unregistered synchronization root
212                // nor a security update denying access to the current user.
213                if (log.isDebugEnabled()) {
214                    log.debug(String.format(
215                            "No extended info found in audit log entry, document %s has not been deleted nor is an unregistered synchronization root.",
216                            docRef));
217                }
218                if (!session.exists(docRef)) {
219                    if (log.isDebugEnabled()) {
220                        log.debug(String.format("Document %s doesn't exist, not adding entry to the change summary.",
221                                docRef));
222                    }
223                    // Deleted or non accessible documents are mapped to
224                    // deleted file system items in a separate event: no need to
225                    // try to propagate this event.
226                    continue;
227                }
228                // Let's try to adapt the document as a FileSystemItem to
229                // provide it to the FileSystemItemChange entry.
230                change = getFileSystemItemChange(session, docRef, entry, null);
231                if (change == null) {
232                    // Non-adaptable documents are ignored
233                    if (log.isDebugEnabled()) {
234                        log.debug(String.format(
235                                "Document %s is not adaptable as a FileSystemItem, not adding any entry to the change summary.",
236                                docRef));
237                    }
238                } else {
239                    if (log.isDebugEnabled()) {
240                        log.debug(String.format(
241                                "Adding FileSystemItemChange entry for document %s to the change summary.", docRef));
242                    }
243                    changes.add(change);
244                }
245            }
246        }
247        return changes;
248    }
249
250    /**
251     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping for backward compatibility.
252     * <p>
253     * Return the current time to query the logDate field of the audit log. This time intentionally truncated to 0
254     * milliseconds to have a consistent behavior across databases.
255     * <p>
256     * Should now use last available log id in the audit log table as upper bound.
257     *
258     * @see https://jira.nuxeo.com/browse/NXP-14826
259     * @see #getUpperBound()
260     */
261    @Override
262    public long getCurrentDate() {
263        long now = System.currentTimeMillis();
264        return now - (now % 1000);
265    }
266
267    /**
268     * Return the last available log id in the audit log table (primary key) to be used as the upper bound of the event
269     * log id range clause in the change query.
270     */
271    @Override
272    @SuppressWarnings("unchecked")
273    public long getUpperBound() {
274        AuditReader auditService = Framework.getService(AuditReader.class);
275        String auditQuery = "from LogEntry log order by log.id desc";
276        if (log.isDebugEnabled()) {
277            log.debug("Querying audit log for greatest id: " + auditQuery);
278        }
279        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1);
280        if (entries.isEmpty()) {
281            if (log.isDebugEnabled()) {
282                log.debug("Found no audit log entries, returning -1");
283            }
284            return -1;
285        }
286        return entries.get(0).getId();
287    }
288
289    /**
290     * Returns the last available log id in the audit log table (primary key) considering events older than the last
291     * clustering invalidation date if clustering is enabled for at least one of the given repositories. This is to make
292     * sure the {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh.
293     */
294    @Override
295    @SuppressWarnings("unchecked")
296    public long getUpperBound(Set<String> repositoryNames) {
297        long clusteringDelay = getClusteringDelay(repositoryNames);
298        AuditReader auditService = Framework.getService(AuditReader.class);
299        Map<String, Object> params = new HashMap<String, Object>();
300        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log");
301        if (clusteringDelay > -1) {
302            // Double the delay in case of overlapping, see https://jira.nuxeo.com/browse/NXP-14826
303            long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay;
304            params.put("lastClusteringInvalidationDate", new Date(lastClusteringInvalidationDate));
305            auditQuerySb.append(" where log.logDate < :lastClusteringInvalidationDate");
306        }
307        auditQuerySb.append(" order by log.id desc");
308        String auditQuery = auditQuerySb.toString();
309        if (log.isDebugEnabled()) {
310            log.debug("Querying audit log for greatest id: " + auditQuery + " with params: " + params);
311        }
312        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, 1);
313        if (entries.isEmpty()) {
314            if (clusteringDelay > -1) {
315                // Check for existing entries without the clustering invalidation date filter to not return -1 in this
316                // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >=
317                // 0
318                List<LogEntry> allEntries = (List<LogEntry>) auditService.nativeQuery("from LogEntry", 1, 1);
319                if (!allEntries.isEmpty()) {
320                    log.debug("Found no audit log entries matching the criterias but some exist, returning 0");
321                    return 0;
322                }
323            }
324            log.debug("Found no audit log entries, returning -1");
325            return -1;
326        }
327        return entries.get(0).getId();
328    }
329
330    /**
331     * Returns the longest clustering delay among the given repositories for which clustering is enabled.
332     */
333    protected long getClusteringDelay(Set<String> repositoryNames) {
334        long clusteringDelay = -1;
335        SQLRepositoryService repositoryService = Framework.getService(SQLRepositoryService.class);
336        for (String repositoryName : repositoryNames) {
337            RepositoryDescriptor repositoryDescriptor = repositoryService.getRepositoryDescriptor(repositoryName);
338            if (repositoryDescriptor == null) {
339                // Not a VCS repository`
340                continue;
341            }
342            if (repositoryDescriptor.getClusteringEnabled()) {
343                clusteringDelay = Math.max(clusteringDelay, repositoryDescriptor.getClusteringDelay());
344            }
345        }
346        return clusteringDelay;
347    }
348
349    @SuppressWarnings("unchecked")
350    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
351            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
352        AuditReader auditService = Framework.getLocalService(AuditReader.class);
353        // Set fixed query parameters
354        Map<String, Object> params = new HashMap<String, Object>();
355        params.put("repositoryId", session.getRepositoryName());
356
357        // Build query and set dynamic parameters
358        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where ");
359        auditQuerySb.append("log.repositoryId = :repositoryId");
360        auditQuerySb.append(" and ");
361        auditQuerySb.append("(");
362        if (!activeRoots.getPaths().isEmpty()) {
363            // detect changes under the currently active roots for the
364            // current user
365            auditQuerySb.append("(");
366            auditQuerySb.append("log.category = 'eventDocumentCategory'");
367            // TODO: don't hardcode event ids (contribute them?)
368            auditQuerySb.append(" and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked')");
369            auditQuerySb.append(" or ");
370            auditQuerySb.append("log.category = 'eventLifeCycleCategory'");
371            auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ");
372            auditQuerySb.append(") and (");
373            auditQuerySb.append("(");
374            auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params));
375            auditQuerySb.append(")");
376            if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) {
377                auditQuerySb.append(" or (");
378                auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params));
379                auditQuerySb.append(")");
380            }
381            auditQuerySb.append(") or ");
382        }
383        // Detect any root (un-)registration changes for the roots previously
384        // seen by the current user.
385        // Exclude 'rootUnregistered' since root unregistration is covered by a
386        // "deleted" virtual event.
387        auditQuerySb.append("(");
388        auditQuerySb.append("log.category = '");
389        auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY);
390        auditQuerySb.append("' and log.eventId != 'rootUnregistered'");
391        auditQuerySb.append(")");
392        auditQuerySb.append(") and (");
393        auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, integerBounds, params));
394        // we intentionally sort by eventDate even if the range filtering is
395        // done on the log id: eventDate is useful to reflect the ordering of
396        // events occurring inside the same transaction while the
397        // monotonic behavior of log id is useful for ensuring that consecutive
398        // range queries to the audit won't miss any events even when long
399        // running transactions are logged after a delay.
400        auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc");
401        String auditQuery = auditQuerySb.toString();
402
403        if (log.isDebugEnabled()) {
404            if (log.isDebugEnabled()) {
405                log.debug("Querying audit log for changes: " + auditQuery + " with params: " + params);
406            }
407        }
408        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit);
409
410        // Post filter the output to remove (un)registration that are unrelated
411        // to the current user.
412        List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>();
413        String principalName = session.getPrincipal().getName();
414        for (LogEntry entry : entries) {
415            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
416            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
417                // ignore event that only impact other users
418                continue;
419            }
420            if (log.isDebugEnabled()) {
421                if (log.isDebugEnabled()) {
422                    log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s",
423                            entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(),
424                            entry.getDocPath()));
425                }
426            }
427            postFilteredEntries.add(entry);
428        }
429        return postFilteredEntries;
430    }
431
432    protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) {
433        StringBuilder rootPathClause = new StringBuilder();
434        int rootPathCount = 0;
435        for (String rootPath : rootPaths) {
436            rootPathCount++;
437            String rootPathParam = "rootPath" + rootPathCount;
438            if (rootPathClause.length() > 0) {
439                rootPathClause.append(" or ");
440            }
441            rootPathClause.append(String.format("log.docPath like :%s", rootPathParam));
442            params.put(rootPathParam, rootPath + '%');
443
444        }
445        return rootPathClause.toString();
446    }
447
448    protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds,
449            Map<String, Object> params) {
450        String paramName = "collectionMemberIds";
451        params.put(paramName, collectionSyncRootMemberIds);
452        return String.format("log.docUUID in (:%s)", paramName);
453    }
454
455    /**
456     * Now using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826.
457     * <p>
458     * Keeping ability to use old clause based on log date for backward compatibility, to be deprecated.
459     */
460    protected String getJPARangeClause(long lowerBound, long upperBound, boolean integerBounds,
461            Map<String, Object> params) {
462        if (integerBounds) {
463            params.put("lowerBound", lowerBound);
464            params.put("upperBound", upperBound);
465            return "log.id > :lowerBound and log.id <= :upperBound";
466        } else {
467            params.put("lastSuccessfulSyncDate", new Date(lowerBound));
468            params.put("syncDate", new Date(upperBound));
469            return "log.logDate >= :lastSuccessfulSyncDate and log.logDate < :syncDate";
470        }
471    }
472
473    protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry,
474            String expectedFileSystemItemId) {
475        DocumentModel doc = session.getDocument(docRef);
476        // TODO: check the facet, last root change and list of roots
477        // to have a special handling for the roots.
478        FileSystemItem fsItem = null;
479        try {
480            fsItem = Framework.getLocalService(FileSystemItemAdapterService.class).getFileSystemItem(doc);
481        } catch (RootlessItemException e) {
482            // Can happen for an unregistered synchronization root that cannot
483            // be adapted as a FileSystemItem: nothing to do.
484            if (log.isDebugEnabled()) {
485                log.debug(String.format(
486                        "RootlessItemException thrown while trying to adapt document %s as a FileSystemItem.", docRef));
487            }
488        }
489        if (fsItem == null) {
490            if (log.isDebugEnabled()) {
491                log.debug(String.format("Document %s is not adaptable as a FileSystemItem, returning null.", docRef));
492            }
493            return null;
494        }
495        if (expectedFileSystemItemId != null
496                && !fsItem.getId().endsWith(
497                        AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) {
498            if (log.isDebugEnabled()) {
499                log.debug(String.format(
500                        "Id %s of FileSystemItem adapted from document %s doesn't match expected FileSystemItem id %s, returning null.",
501                        fsItem.getId(), docRef, expectedFileSystemItemId));
502            }
503            return null;
504        }
505        if (log.isDebugEnabled()) {
506            log.debug(String.format(
507                    "Document %s is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.",
508                    docRef));
509        }
510        // EventDate is able to reflect the ordering of the events
511        // inside a transaction (e.g. when several documents are
512        // created, updated, deleted at once) hence it's useful
513        // to pass that info to the client even though the change
514        // detection filtering is using the log id to have a
515        // guaranteed monotonic behavior that evenDate cannot
516        // guarantee when facing long transactions.
517        return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(),
518                entry.getRepositoryId(), entry.getDocUUID(), fsItem);
519    }
520
521}