001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.drive.service.impl;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.drive.adapter.FileSystemItem;
031import org.nuxeo.drive.adapter.RootlessItemException;
032import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem;
033import org.nuxeo.drive.service.FileSystemChangeFinder;
034import org.nuxeo.drive.service.FileSystemItemAdapterService;
035import org.nuxeo.drive.service.FileSystemItemChange;
036import org.nuxeo.drive.service.NuxeoDriveEvents;
037import org.nuxeo.drive.service.NuxeoDriveManager;
038import org.nuxeo.drive.service.SynchronizationRoots;
039import org.nuxeo.drive.service.TooManyChangesException;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.DocumentRef;
043import org.nuxeo.ecm.core.api.IdRef;
044import org.nuxeo.ecm.platform.audit.api.AuditReader;
045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
046import org.nuxeo.ecm.platform.audit.api.LogEntry;
047import org.nuxeo.runtime.api.Framework;
048
049/**
050 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}.
051 *
052 * @author Antoine Taillefer
053 */
054public class AuditChangeFinder implements FileSystemChangeFinder {
055
056    private static final long serialVersionUID = 1963018967324857522L;
057
058    private static final Log log = LogFactory.getLog(AuditChangeFinder.class);
059
060    protected Map<String, String> parameters = new HashMap<String, String>();
061
062    @Override
063    public void handleParameters(Map<String, String> parameters) {
064        this.parameters.putAll(parameters);
065    }
066
067    /**
068     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping old method based on log date
069     * for backward compatibility.
070     * <p>
071     * Now using event log id for lower and upper bounds to ensure consistency.
072     *
073     * @see https://jira.nuxeo.com/browse/NXP-14826
074     * @see #getFileSystemChangesIntegerBounds(CoreSession, Set, SynchronizationRoots, Set, long, long, int)
075     */
076    @Override
077    public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
078            SynchronizationRoots activeRoots, long lastSuccessfulSyncDate, long syncDate, int limit)
079            throws TooManyChangesException {
080        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, null, lastSuccessfulSyncDate, syncDate,
081                false, limit);
082    }
083
084    @Override
085    public List<FileSystemItemChange> getFileSystemChangesIntegerBounds(CoreSession session,
086            Set<IdRef> lastActiveRootRefs, SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds,
087            long lowerBound, long upperBound, int limit) throws TooManyChangesException {
088        return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, collectionSyncRootMemberIds, lowerBound,
089                upperBound, true, limit);
090    }
091
092    protected List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
093            SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound,
094            long upperBound, boolean integerBounds, int limit) throws TooManyChangesException {
095        String principalName = session.getPrincipal().getName();
096        List<FileSystemItemChange> changes = new ArrayList<FileSystemItemChange>();
097
098        // Note: lastActiveRootRefs is not used: we could remove it from the
099        // public API
100        // and from the client as well but it might be useful to optimize future
101        // alternative implementations FileSystemChangeFinder component so it
102        // might
103        // be better to leave it part of the public API as currently.
104
105        // Find changes from the log under active roots or events that are
106        // linked to the un-registration or deletion of formerly synchronized
107        // roots
108        List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
109                upperBound, integerBounds, limit);
110
111        // First pass over the entries to check if a "NuxeoDrive" event has
112        // occurred during that period.
113        // This event can be:
114        // - a root registration
115        // - a root unregistration
116        // - a "deleted" transition
117        // - an "undeleted" transition
118        // - a removal
119        // - a move to an non synchronization root
120        // - a security update
121        // Thus the list of active roots may have changed and the cache might
122        // need to be invalidated: let's make sure we perform a
123        // query with the actual active roots.
124        for (LogEntry entry : entries) {
125            if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) {
126                if (log.isDebugEnabled()) {
127                    log.debug(String.format("Detected sync root change for user '%s' in audit log:"
128                            + " invalidating the root cache and refetching the changes.", principalName));
129                }
130                NuxeoDriveManager driveManager = Framework.getLocalService(NuxeoDriveManager.class);
131                driveManager.invalidateSynchronizationRootsCache(principalName);
132                driveManager.invalidateCollectionSyncRootMemberCache(principalName);
133                Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots(session.getPrincipal());
134                SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName());
135                Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds(
136                        session.getPrincipal()).get(session.getRepositoryName());
137                entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds,
138                        lowerBound, upperBound, integerBounds, limit);
139                break;
140            }
141        }
142
143        if (entries.size() >= limit) {
144            throw new TooManyChangesException("Too many changes found in the audit logs.");
145        }
146        for (LogEntry entry : entries) {
147            FileSystemItemChange change = null;
148            DocumentRef docRef = new IdRef(entry.getDocUUID());
149            ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId");
150            if (fsIdInfo != null) {
151                // This document has been deleted, moved, is an unregistered synchronization root or its security has
152                // been updated, we just know the FileSystemItem id and name.
153                if (log.isDebugEnabled()) {
154                    log.debug(String.format(
155                            "Found extended info in audit log entry, document %s has been deleted or is an unregistered synchronization root.",
156                            docRef));
157                }
158                boolean isChangeSet = false;
159                // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry,
160                // only in the case of a move or a security update.
161                // This can succeed if this is a move to a synchronization root or a security update after which the
162                // current user still has access to the document.
163                if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) {
164                    change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class));
165                    if (change != null) {
166                        if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
167                            // A move to a synchronization root also fires a documentMoved event, don't propagate the
168                            // virtual event.
169                            if (log.isDebugEnabled()) {
170                                log.debug(String.format(
171                                        "Document %s has been moved to another synchronzation root, not adding entry to the change summary.",
172                                        docRef));
173                            }
174                            continue;
175                        }
176                        isChangeSet = true;
177                    }
178                }
179                if (!isChangeSet) {
180                    // If the document has been deleted, is a regular unregistered synchronization root, has been moved
181                    // to a non synchronization root, if its security has been updated denying access to the current
182                    // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the
183                    // FileSystemItem id and name to the FileSystemItemChange entry.
184                    if (log.isDebugEnabled()) {
185                        log.debug(String.format(
186                                "Document %s doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.",
187                                docRef));
188                    }
189                    String fsId = fsIdInfo.getValue(String.class);
190                    String fsName = entry.getExtendedInfos().get("fileSystemItemName").getValue(String.class);
191                    String eventId;
192                    if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
193                        // Move to a non synchronization root
194                        eventId = NuxeoDriveEvents.DELETED_EVENT;
195                    } else {
196                        // Deletion, unregistration or security update
197                        eventId = entry.getEventId();
198                    }
199                    change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(),
200                            entry.getRepositoryId(), entry.getDocUUID(), fsId, fsName);
201                }
202                if (log.isDebugEnabled()) {
203                    log.debug(String.format("Adding FileSystemItemChange entry for document %s to the change summary.",
204                            docRef));
205                }
206                changes.add(change);
207            } else {
208                // No extended info in the audit log entry, this should not be a
209                // deleted document, nor an unregistered synchronization root
210                // nor a security update denying access to the current user.
211                if (log.isDebugEnabled()) {
212                    log.debug(String.format(
213                            "No extended info found in audit log entry, document %s has not been deleted nor is an unregistered synchronization root.",
214                            docRef));
215                }
216                if (!session.exists(docRef)) {
217                    if (log.isDebugEnabled()) {
218                        log.debug(String.format("Document %s doesn't exist, not adding entry to the change summary.",
219                                docRef));
220                    }
221                    // Deleted or non accessible documents are mapped to
222                    // deleted file system items in a separate event: no need to
223                    // try to propagate this event.
224                    continue;
225                }
226                // Let's try to adapt the document as a FileSystemItem to
227                // provide it to the FileSystemItemChange entry.
228                change = getFileSystemItemChange(session, docRef, entry, null);
229                if (change == null) {
230                    // Non-adaptable documents are ignored
231                    if (log.isDebugEnabled()) {
232                        log.debug(String.format(
233                                "Document %s is not adaptable as a FileSystemItem, not adding any entry to the change summary.",
234                                docRef));
235                    }
236                } else {
237                    if (log.isDebugEnabled()) {
238                        log.debug(String.format(
239                                "Adding FileSystemItemChange entry for document %s to the change summary.", docRef));
240                    }
241                    changes.add(change);
242                }
243            }
244        }
245        return changes;
246    }
247
248    /**
249     * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping for backward compatibility.
250     * <p>
251     * Return the current time to query the logDate field of the audit log. This time intentionally truncated to 0
252     * milliseconds to have a consistent behavior across databases.
253     * <p>
254     * Should now use last available log id in the audit log table as upper bound.
255     *
256     * @see https://jira.nuxeo.com/browse/NXP-14826
257     * @see #getUpperBound()
258     */
259    @Override
260    public long getCurrentDate() {
261        long now = System.currentTimeMillis();
262        return now - (now % 1000);
263    }
264
265    /**
266     * Return the last available log id in the audit log table (primary key) to be used as the upper bound of the event
267     * log id range clause in the change query.
268     */
269    @Override
270    @SuppressWarnings("unchecked")
271    public long getUpperBound() {
272        AuditReader auditService = Framework.getService(AuditReader.class);
273        String auditQuery = "from LogEntry log order by log.id desc";
274        if (log.isDebugEnabled()) {
275            if (log.isDebugEnabled()) {
276                log.debug("Querying audit log for greatest id: " + auditQuery);
277            }
278        }
279        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1);
280        if (entries.isEmpty()) {
281            if (log.isDebugEnabled()) {
282                log.debug("Found no audit log entries, returning -1");
283            }
284            return -1;
285        }
286        return entries.get(0).getId();
287    }
288
289    @SuppressWarnings("unchecked")
290    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
291            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) {
292        AuditReader auditService = Framework.getLocalService(AuditReader.class);
293        // Set fixed query parameters
294        Map<String, Object> params = new HashMap<String, Object>();
295        params.put("repositoryId", session.getRepositoryName());
296
297        // Build query and set dynamic parameters
298        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where ");
299        auditQuerySb.append("log.repositoryId = :repositoryId");
300        auditQuerySb.append(" and ");
301        auditQuerySb.append("(");
302        if (!activeRoots.getPaths().isEmpty()) {
303            // detect changes under the currently active roots for the
304            // current user
305            auditQuerySb.append("(");
306            auditQuerySb.append("log.category = 'eventDocumentCategory'");
307            // TODO: don't hardcode event ids (contribute them?)
308            auditQuerySb.append(" and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked')");
309            auditQuerySb.append(" or ");
310            auditQuerySb.append("log.category = 'eventLifeCycleCategory'");
311            auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ");
312            auditQuerySb.append(") and (");
313            auditQuerySb.append("(");
314            auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params));
315            auditQuerySb.append(")");
316            if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) {
317                auditQuerySb.append(" or (");
318                auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params));
319                auditQuerySb.append(")");
320            }
321            auditQuerySb.append(") or ");
322        }
323        // Detect any root (un-)registration changes for the roots previously
324        // seen by the current user.
325        // Exclude 'rootUnregistered' since root unregistration is covered by a
326        // "deleted" virtual event.
327        auditQuerySb.append("(");
328        auditQuerySb.append("log.category = '");
329        auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY);
330        auditQuerySb.append("' and log.eventId != 'rootUnregistered'");
331        auditQuerySb.append(")");
332        auditQuerySb.append(") and (");
333        auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, integerBounds, params));
334        // we intentionally sort by eventDate even if the range filtering is
335        // done on the log id: eventDate is useful to reflect the ordering of
336        // events occurring inside the same transaction while the
337        // monotonic behavior of log id is useful for ensuring that consecutive
338        // range queries to the audit won't miss any events even when long
339        // running transactions are logged after a delay.
340        auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc");
341        String auditQuery = auditQuerySb.toString();
342
343        if (log.isDebugEnabled()) {
344            if (log.isDebugEnabled()) {
345                log.debug("Querying audit log for changes: " + auditQuery + " with params: " + params);
346            }
347        }
348        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit);
349
350        // Post filter the output to remove (un)registration that are unrelated
351        // to the current user.
352        List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>();
353        String principalName = session.getPrincipal().getName();
354        for (LogEntry entry : entries) {
355            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
356            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
357                // ignore event that only impact other users
358                continue;
359            }
360            if (log.isDebugEnabled()) {
361                if (log.isDebugEnabled()) {
362                    log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s",
363                            entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(),
364                            entry.getDocPath()));
365                }
366            }
367            postFilteredEntries.add(entry);
368        }
369        return postFilteredEntries;
370    }
371
372    protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) {
373        StringBuilder rootPathClause = new StringBuilder();
374        int rootPathCount = 0;
375        for (String rootPath : rootPaths) {
376            rootPathCount++;
377            String rootPathParam = "rootPath" + rootPathCount;
378            if (rootPathClause.length() > 0) {
379                rootPathClause.append(" or ");
380            }
381            rootPathClause.append(String.format("log.docPath like :%s", rootPathParam));
382            params.put(rootPathParam, rootPath + '%');
383
384        }
385        return rootPathClause.toString();
386    }
387
388    protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds,
389            Map<String, Object> params) {
390        String paramName = "collectionMemberIds";
391        params.put(paramName, collectionSyncRootMemberIds);
392        return String.format("log.docUUID in (:%s)", paramName);
393    }
394
395    /**
396     * Now using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826.
397     * <p>
398     * Keeping ability to use old clause based on log date for backward compatibility, to be deprecated.
399     */
400    protected String getJPARangeClause(long lowerBound, long upperBound, boolean integerBounds,
401            Map<String, Object> params) {
402        if (integerBounds) {
403            params.put("lowerBound", lowerBound);
404            params.put("upperBound", upperBound);
405            return "log.id > :lowerBound and log.id <= :upperBound";
406        } else {
407            params.put("lastSuccessfulSyncDate", new Date(lowerBound));
408            params.put("syncDate", new Date(upperBound));
409            return "log.logDate >= :lastSuccessfulSyncDate and log.logDate < :syncDate";
410        }
411    }
412
413    protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry,
414            String expectedFileSystemItemId) {
415        DocumentModel doc = session.getDocument(docRef);
416        // TODO: check the facet, last root change and list of roots
417        // to have a special handling for the roots.
418        FileSystemItem fsItem = null;
419        try {
420            fsItem = Framework.getLocalService(FileSystemItemAdapterService.class).getFileSystemItem(doc);
421        } catch (RootlessItemException e) {
422            // Can happen for an unregistered synchronization root that cannot
423            // be adapted as a FileSystemItem: nothing to do.
424            if (log.isDebugEnabled()) {
425                log.debug(String.format(
426                        "RootlessItemException thrown while trying to adapt document %s as a FileSystemItem.", docRef));
427            }
428        }
429        if (fsItem == null) {
430            if (log.isDebugEnabled()) {
431                log.debug(String.format("Document %s is not adaptable as a FileSystemItem, returning null.", docRef));
432            }
433            return null;
434        }
435        if (expectedFileSystemItemId != null
436                && !fsItem.getId().endsWith(
437                        AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) {
438            if (log.isDebugEnabled()) {
439                log.debug(String.format(
440                        "Id %s of FileSystemItem adapted from document %s doesn't match expected FileSystemItem id %s, returning null.",
441                        fsItem.getId(), docRef, expectedFileSystemItemId));
442            }
443            return null;
444        }
445        if (log.isDebugEnabled()) {
446            log.debug(String.format(
447                    "Document %s is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.",
448                    docRef));
449        }
450        // EventDate is able to reflect the ordering of the events
451        // inside a transaction (e.g. when several documents are
452        // created, updated, deleted at once) hence it's useful
453        // to pass that info to the client even though the change
454        // detection filtering is using the log id to have a
455        // guaranteed monotonic behavior that evenDate cannot
456        // guarantee when facing long transactions.
457        return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(),
458                entry.getRepositoryId(), entry.getDocUUID(), fsItem);
459    }
460
461}