001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Antoine Taillefer <ataillefer@nuxeo.com>
016 */
017package org.nuxeo.drive.service.impl;
018
019import java.util.ArrayList;
020import java.util.Date;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.drive.adapter.FileSystemItem;
029import org.nuxeo.drive.adapter.RootlessItemException;
030import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem;
031import org.nuxeo.drive.service.FileSystemChangeFinder;
032import org.nuxeo.drive.service.FileSystemItemAdapterService;
033import org.nuxeo.drive.service.FileSystemItemChange;
034import org.nuxeo.drive.service.NuxeoDriveEvents;
035import org.nuxeo.drive.service.NuxeoDriveManager;
036import org.nuxeo.drive.service.SynchronizationRoots;
037import org.nuxeo.drive.service.TooManyChangesException;
038import org.nuxeo.ecm.core.api.CoreSession;
039import org.nuxeo.ecm.core.api.DocumentModel;
040import org.nuxeo.ecm.core.api.DocumentRef;
041import org.nuxeo.ecm.core.api.IdRef;
042import org.nuxeo.ecm.platform.audit.api.AuditReader;
043import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
044import org.nuxeo.ecm.platform.audit.api.LogEntry;
045import org.nuxeo.runtime.api.Framework;
046
047/**
048 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}.
049 *
050 * @author Antoine Taillefer
051 */
052public class AuditChangeFinder implements FileSystemChangeFinder {
053
054    private static final long serialVersionUID = 1963018967324857522L;
055
056    private static final Log log = LogFactory.getLog(AuditChangeFinder.class);
057
058    protected Map<String, String> parameters = new HashMap<String, String>();
059
060    @Override
061    public void handleParameters(Map<String, String> parameters) {
062        this.parameters.putAll(parameters);
063    }
064
065    /**
066     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping old method based on log date
067     * for backward compatibility.
068     * <p>
069     * Now using event log id for lower and upper bounds to ensure consistency.
070     *
071     * @see https://jira.nuxeo.com/browse/NXP-14826
072     * @see #getFileSystemChangesIntegerBounds(CoreSession, Set, SynchronizationRoots, Set, long, long, int)
073     */
074    @Override
075    public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
076            SynchronizationRoots activeRoots, long lastSuccessfulSyncDate, long syncDate, int limit)
077            throws TooManyChangesException {
078        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, null, lastSuccessfulSyncDate, syncDate,
079                false, limit);
080    }
081
082    @Override
083    public List<FileSystemItemChange> getFileSystemChangesIntegerBounds(CoreSession session,
084            Set<IdRef> lastActiveRootRefs, SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds,
085            long lowerBound, long upperBound, int limit) throws TooManyChangesException {
086        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, collectionSyncRootMemberIds, lowerBound,
087                upperBound, true, limit);
088    }
089
090    protected List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
091            SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound,
092            long upperBound, boolean integerBounds, int limit) throws TooManyChangesException {
093        String principalName = session.getPrincipal().getName();
094        List<FileSystemItemChange> changes = new ArrayList<FileSystemItemChange>();
095
096        // Note: lastActiveRootRefs is not used: we could remove it from the
097        // public API
098        // and from the client as well but it might be useful to optimize future
099        // alternative implementations FileSystemChangeFinder component so it
100        // might
101        // be better to leave it part of the public API as currently.
102
103        // Find changes from the log under active roots or events that are
104        // linked to the un-registration or deletion of formerly synchronized
105        // roots
106        List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
107                upperBound, integerBounds, limit);
108
109        // First pass over the entries to check if a "NuxeoDrive" event has
110        // occurred during that period.
111        // This event can be:
112        // - a root registration
113        // - a root unregistration
114        // - a "deleted" transition
115        // - an "undeleted" transition
116        // - a removal
117        // - a move to an non synchronization root
118        // - a security update
119        // Thus the list of active roots may have changed and the cache might
120        // need to be invalidated: let's make sure we perform a
121        // query with the actual active roots.
122        for (LogEntry entry : entries) {
123            if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) {
124                if (log.isDebugEnabled()) {
125                    log.debug(String.format("Detected sync root change for user '%s' in audit log:"
126                            + " invalidating the root cache and refetching the changes.", principalName));
127                }
128                NuxeoDriveManager driveManager = Framework.getLocalService(NuxeoDriveManager.class);
129                driveManager.invalidateSynchronizationRootsCache(principalName);
130                driveManager.invalidateCollectionSyncRootMemberCache(principalName);
131                Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots(session.getPrincipal());
132                SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName());
133                Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds(
134                        session.getPrincipal()).get(session.getRepositoryName());
135                entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds,
136                        lowerBound, upperBound, integerBounds, limit);
137                break;
138            }
139        }
140
141        if (entries.size() >= limit) {
142            throw new TooManyChangesException("Too many changes found in the audit logs.");
143        }
144        for (LogEntry entry : entries) {
145            FileSystemItemChange change = null;
146            DocumentRef docRef = new IdRef(entry.getDocUUID());
147            ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId");
148            if (fsIdInfo != null) {
149                // This document has been deleted, moved, is an unregistered synchronization root or its security has
150                // been updated, we just know the FileSystemItem id and name.
151                if (log.isDebugEnabled()) {
152                    log.debug(String.format(
153                            "Found extended info in audit log entry, document %s has been deleted or is an unregistered synchronization root.",
154                            docRef));
155                }
156                boolean isChangeSet = false;
157                // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry,
158                // only in the case of a move or a security update.
159                // This can succeed if this is a move to a synchronization root or a security update after which the
160                // current user still has access to the document.
161                if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) {
162                    change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class));
163                    if (change != null) {
164                        if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
165                            // A move to a synchronization root also fires a documentMoved event, don't propagate the
166                            // virtual event.
167                            if (log.isDebugEnabled()) {
168                                log.debug(String.format(
169                                        "Document %s has been moved to another synchronzation root, not adding entry to the change summary.",
170                                        docRef));
171                            }
172                            continue;
173                        }
174                        isChangeSet = true;
175                    }
176                }
177                if (!isChangeSet) {
178                    // If the document has been deleted, is a regular unregistered synchronization root, has been moved
179                    // to a non synchronization root, if its security has been updated denying access to the current
180                    // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the
181                    // FileSystemItem id and name to the FileSystemItemChange entry.
182                    if (log.isDebugEnabled()) {
183                        log.debug(String.format(
184                                "Document %s doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.",
185                                docRef));
186                    }
187                    String fsId = fsIdInfo.getValue(String.class);
188                    String fsName = entry.getExtendedInfos().get("fileSystemItemName").getValue(String.class);
189                    String eventId;
190                    if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
191                        // Move to a non synchronization root
192                        eventId = NuxeoDriveEvents.DELETED_EVENT;
193                    } else {
194                        // Deletion, unregistration or security update
195                        eventId = entry.getEventId();
196                    }
197                    change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(),
198                            entry.getRepositoryId(), entry.getDocUUID(), fsId, fsName);
199                }
200                if (log.isDebugEnabled()) {
201                    log.debug(String.format("Adding FileSystemItemChange entry for document %s to the change summary.",
202                            docRef));
203                }
204                changes.add(change);
205            } else {
206                // No extended info in the audit log entry, this should not be a
207                // deleted document, nor an unregistered synchronization root
208                // nor a security update denying access to the current user.
209                if (log.isDebugEnabled()) {
210                    log.debug(String.format(
211                            "No extended info found in audit log entry, document %s has not been deleted nor is an unregistered synchronization root.",
212                            docRef));
213                }
214                if (!session.exists(docRef)) {
215                    if (log.isDebugEnabled()) {
216                        log.debug(String.format("Document %s doesn't exist, not adding entry to the change summary.",
217                                docRef));
218                    }
219                    // Deleted or non accessible documents are mapped to
220                    // deleted file system items in a separate event: no need to
221                    // try to propagate this event.
222                    continue;
223                }
224                // Let's try to adapt the document as a FileSystemItem to
225                // provide it to the FileSystemItemChange entry.
226                change = getFileSystemItemChange(session, docRef, entry, null);
227                if (change == null) {
228                    // Non-adaptable documents are ignored
229                    if (log.isDebugEnabled()) {
230                        log.debug(String.format(
231                                "Document %s is not adaptable as a FileSystemItem, not adding any entry to the change summary.",
232                                docRef));
233                    }
234                } else {
235                    if (log.isDebugEnabled()) {
236                        log.debug(String.format(
237                                "Adding FileSystemItemChange entry for document %s to the change summary.", docRef));
238                    }
239                    changes.add(change);
240                }
241            }
242        }
243        return changes;
244    }
245
246    /**
247     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping for backward compatibility.
248     * <p>
249     * Return the current time to query the logDate field of the audit log. This time intentionally truncated to 0
250     * milliseconds to have a consistent behavior across databases.
251     * <p>
252     * Should now use last available log id in the audit log table as upper bound.
253     *
254     * @see https://jira.nuxeo.com/browse/NXP-14826
255     * @see #getUpperBound()
256     */
257    @Override
258    public long getCurrentDate() {
259        long now = System.currentTimeMillis();
260        return now - (now % 1000);
261    }
262
263    /**
264     * Return the last available log id in the audit log table (primary key) to be used as the upper bound of the event
265     * log id range clause in the change query.
266     */
267    @Override
268    @SuppressWarnings("unchecked")
269    public long getUpperBound() {
270        AuditReader auditService = Framework.getService(AuditReader.class);
271        String auditQuery = "from LogEntry log order by log.id desc";
272        if (log.isDebugEnabled()) {
273            if (log.isDebugEnabled()) {
274                log.debug("Querying audit log for greatest id: " + auditQuery);
275            }
276        }
277        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1);
278        if (entries.isEmpty()) {
279            if (log.isDebugEnabled()) {
280                log.debug("Found no audit log entries, returning -1");
281            }
282            return -1;
283        }
284        return entries.get(0).getId();
285    }
286
287    @SuppressWarnings("unchecked")
288    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
289            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
290        AuditReader auditService = Framework.getLocalService(AuditReader.class);
291        // Set fixed query parameters
292        Map<String, Object> params = new HashMap<String, Object>();
293        params.put("repositoryId", session.getRepositoryName());
294
295        // Build query and set dynamic parameters
296        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where ");
297        auditQuerySb.append("log.repositoryId = :repositoryId");
298        auditQuerySb.append(" and ");
299        auditQuerySb.append("(");
300        if (!activeRoots.getPaths().isEmpty()) {
301            // detect changes under the currently active roots for the
302            // current user
303            auditQuerySb.append("(");
304            auditQuerySb.append("log.category = 'eventDocumentCategory'");
305            // TODO: don't hardcode event ids (contribute them?)
306            auditQuerySb.append(" and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished')");
307            auditQuerySb.append(" or ");
308            auditQuerySb.append("log.category = 'eventLifeCycleCategory'");
309            auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ");
310            auditQuerySb.append(") and (");
311            auditQuerySb.append("(");
312            auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params));
313            auditQuerySb.append(")");
314            if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) {
315                auditQuerySb.append(" or (");
316                auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params));
317                auditQuerySb.append(")");
318            }
319            auditQuerySb.append(") or ");
320        }
321        // Detect any root (un-)registration changes for the roots previously
322        // seen by the current user.
323        // Exclude 'rootUnregistered' since root unregistration is covered by a
324        // "deleted" virtual event.
325        auditQuerySb.append("(");
326        auditQuerySb.append("log.category = '");
327        auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY);
328        auditQuerySb.append("' and log.eventId != 'rootUnregistered'");
329        auditQuerySb.append(")");
330        auditQuerySb.append(") and (");
331        auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, integerBounds, params));
332        // we intentionally sort by eventDate even if the range filtering is
333        // done on the log id: eventDate is useful to reflect the ordering of
334        // events occurring inside the same transaction while the
335        // monotonic behavior of log id is useful for ensuring that consecutive
336        // range queries to the audit won't miss any events even when long
337        // running transactions are logged after a delay.
338        auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc");
339        String auditQuery = auditQuerySb.toString();
340
341        if (log.isDebugEnabled()) {
342            if (log.isDebugEnabled()) {
343                log.debug("Querying audit log for changes: " + auditQuery + " with params: " + params);
344            }
345        }
346        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit);
347
348        // Post filter the output to remove (un)registration that are unrelated
349        // to the current user.
350        List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>();
351        String principalName = session.getPrincipal().getName();
352        for (LogEntry entry : entries) {
353            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
354            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
355                // ignore event that only impact other users
356                continue;
357            }
358            if (log.isDebugEnabled()) {
359                if (log.isDebugEnabled()) {
360                    log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s",
361                            entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(),
362                            entry.getDocPath()));
363                }
364            }
365            postFilteredEntries.add(entry);
366        }
367        return postFilteredEntries;
368    }
369
370    protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) {
371        StringBuilder rootPathClause = new StringBuilder();
372        int rootPathCount = 0;
373        for (String rootPath : rootPaths) {
374            rootPathCount++;
375            String rootPathParam = "rootPath" + rootPathCount;
376            if (rootPathClause.length() > 0) {
377                rootPathClause.append(" or ");
378            }
379            rootPathClause.append(String.format("log.docPath like :%s", rootPathParam));
380            params.put(rootPathParam, rootPath + '%');
381
382        }
383        return rootPathClause.toString();
384    }
385
386    protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds,
387            Map<String, Object> params) {
388        String paramName = "collectionMemberIds";
389        params.put(paramName, collectionSyncRootMemberIds);
390        return String.format("log.docUUID in (:%s)", paramName);
391    }
392
393    /**
394     * Now using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826.
395     * <p>
396     * Keeping ability to use old clause based on log date for backward compatibility, to be deprecated.
397     */
398    protected String getJPARangeClause(long lowerBound, long upperBound, boolean integerBounds,
399            Map<String, Object> params) {
400        if (integerBounds) {
401            params.put("lowerBound", lowerBound);
402            params.put("upperBound", upperBound);
403            return "log.id > :lowerBound and log.id <= :upperBound";
404        } else {
405            params.put("lastSuccessfulSyncDate", new Date(lowerBound));
406            params.put("syncDate", new Date(upperBound));
407            return "log.logDate >= :lastSuccessfulSyncDate and log.logDate < :syncDate";
408        }
409    }
410
411    protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry,
412            String expectedFileSystemItemId) {
413        DocumentModel doc = session.getDocument(docRef);
414        // TODO: check the facet, last root change and list of roots
415        // to have a special handling for the roots.
416        FileSystemItem fsItem = null;
417        try {
418            fsItem = Framework.getLocalService(FileSystemItemAdapterService.class).getFileSystemItem(doc);
419        } catch (RootlessItemException e) {
420            // Can happen for an unregistered synchronization root that cannot
421            // be adapted as a FileSystemItem: nothing to do.
422            if (log.isDebugEnabled()) {
423                log.debug(String.format(
424                        "RootlessItemException thrown while trying to adapt document %s as a FileSystemItem.", docRef));
425            }
426        }
427        if (fsItem == null) {
428            if (log.isDebugEnabled()) {
429                log.debug(String.format("Document %s is not adaptable as a FileSystemItem, returning null.", docRef));
430            }
431            return null;
432        }
433        if (expectedFileSystemItemId != null
434                && !fsItem.getId().endsWith(
435                        AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) {
436            if (log.isDebugEnabled()) {
437                log.debug(String.format(
438                        "Id %s of FileSystemItem adapted from document %s doesn't match expected FileSystemItem id %s, returning null.",
439                        fsItem.getId(), docRef, expectedFileSystemItemId));
440            }
441            return null;
442        }
443        if (log.isDebugEnabled()) {
444            log.debug(String.format(
445                    "Document %s is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.",
446                    docRef));
447        }
448        // EventDate is able to reflect the ordering of the events
449        // inside a transaction (e.g. when several documents are
450        // created, updated, deleted at once) hence it's useful
451        // to pass that info to the client even though the change
452        // detection filtering is using the log id to have a
453        // guaranteed monotonic behavior that evenDate cannot
454        // guarantee when facing long transactions.
455        return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(),
456                entry.getRepositoryId(), entry.getDocUUID(), fsItem);
457    }
458
459}