001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.drive.service.impl;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026
027import org.apache.logging.log4j.LogManager;
028import org.apache.logging.log4j.Logger;
029import org.nuxeo.drive.adapter.FileSystemItem;
030import org.nuxeo.drive.adapter.RootlessItemException;
031import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem;
032import org.nuxeo.drive.service.FileSystemChangeFinder;
033import org.nuxeo.drive.service.FileSystemItemAdapterService;
034import org.nuxeo.drive.service.FileSystemItemChange;
035import org.nuxeo.drive.service.NuxeoDriveEvents;
036import org.nuxeo.drive.service.NuxeoDriveManager;
037import org.nuxeo.drive.service.SynchronizationRoots;
038import org.nuxeo.drive.service.TooManyChangesException;
039import org.nuxeo.ecm.core.api.CoreSession;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.DocumentRef;
042import org.nuxeo.ecm.core.api.IdRef;
043import org.nuxeo.ecm.platform.audit.api.AuditReader;
044import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
045import org.nuxeo.ecm.platform.audit.api.LogEntry;
046import org.nuxeo.runtime.api.Framework;
047
048/**
049 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}.
050 *
051 * @author Antoine Taillefer
052 */
053public class AuditChangeFinder implements FileSystemChangeFinder {
054
055    private static final Logger log = LogManager.getLogger(AuditChangeFinder.class);
056
057    protected Map<String, String> parameters = new HashMap<>();
058
059    @Override
060    public void handleParameters(Map<String, String> parameters) {
061        this.parameters.putAll(parameters);
062    }
063
064    @Override
065    public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs,
066            SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound,
067            int limit) {
068        String principalName = session.getPrincipal().getName();
069        List<FileSystemItemChange> changes = new ArrayList<>();
070
071        // Note: lastActiveRootRefs is not used: we could remove it from the
072        // public API
073        // and from the client as well but it might be useful to optimize future
074        // alternative implementations FileSystemChangeFinder component so it
075        // might
076        // be better to leave it part of the public API as currently.
077
078        // Find changes from the log under active roots or events that are
079        // linked to the un-registration or deletion of formerly synchronized
080        // roots
081        List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound,
082                upperBound, limit);
083
084        // First pass over the entries to check if a "NuxeoDrive" event has
085        // occurred during that period.
086        // This event can be:
087        // - a root registration
088        // - a root unregistration
089        // - a "deleted" transition / documentTrashed event
090        // - an "undeleted" transition
091        // - a removal
092        // - a move to an non synchronization root
093        // - a security update
094        // Thus the list of active roots may have changed and the cache might
095        // need to be invalidated: let's make sure we perform a
096        // query with the actual active roots.
097        for (LogEntry entry : entries) {
098            if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) {
099                log.debug("Detected sync root change for user '{}' in audit log:"
100                        + " invalidating the root cache and refetching the changes.", principalName);
101                NuxeoDriveManager driveManager = Framework.getService(NuxeoDriveManager.class);
102                driveManager.invalidateSynchronizationRootsCache(principalName);
103                driveManager.invalidateCollectionSyncRootMemberCache(principalName);
104                Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots(
105                        session.getPrincipal());
106                SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName());
107                Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds(
108                        session.getPrincipal()).get(session.getRepositoryName());
109                entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds, lowerBound,
110                        upperBound, limit);
111                break;
112            }
113        }
114
115        if (entries.size() >= limit) {
116            throw new TooManyChangesException("Too many changes found in the audit logs.");
117        }
118        for (LogEntry entry : entries) {
119            log.debug("Handling log entry {}", entry);
120            FileSystemItemChange change = null;
121            DocumentRef docRef = new IdRef(entry.getDocUUID());
122            ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId");
123            if (fsIdInfo != null) {
124                // This document has been deleted, moved, is an unregistered synchronization root or its security has
125                // been updated, we just know the FileSystemItem id and name.
126                log.debug("Found extended info in audit log entry: document has been deleted, moved,"
127                        + " is an unregistered synchronization root or its security has been updated,"
128                        + " we just know the FileSystemItem id and name.");
129                boolean isChangeSet = false;
130                // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry,
131                // only in the case of a move or a security update.
132                // This can succeed if this is a move to a synchronization root or a security update after which the
133                // current user still has access to the document.
134                if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) {
135                    change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class));
136                    if (change != null) {
137                        if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
138                            // A move to a synchronization root also fires a documentMoved event, don't propagate the
139                            // virtual event.
140                            log.debug(
141                                    "Document {} ({}) has been moved to another synchronzation root, not adding entry to the change summary.",
142                                    entry::getDocPath, () -> docRef);
143                            continue;
144                        }
145                        isChangeSet = true;
146                    }
147                }
148                if (!isChangeSet) {
149                    // If the document has been deleted, is a regular unregistered synchronization root, has been moved
150                    // to a non synchronization root, if its security has been updated denying access to the current
151                    // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the
152                    // FileSystemItem id and name to the FileSystemItemChange entry.
153                    log.debug(
154                            "Document {} ({}) doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.",
155                            entry::getDocPath, () -> docRef);
156                    String fsId = fsIdInfo.getValue(String.class);
157                    String eventId;
158                    if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) {
159                        // Move to a non synchronization root
160                        eventId = NuxeoDriveEvents.DELETED_EVENT;
161                    } else {
162                        // Deletion, unregistration or security update
163                        eventId = entry.getEventId();
164                    }
165                    change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(),
166                            entry.getRepositoryId(), entry.getDocUUID(), fsId, null);
167                }
168                log.debug("Adding FileSystemItemChange entry to the change summary: {}", change);
169                changes.add(change);
170            } else {
171                // No extended info in the audit log entry, this should not be a deleted document, a moved document, an
172                // unregistered synchronization root nor a security update denying access to the current user.
173                log.debug(
174                        "No extended info found in audit log entry {} ({}): this is not a deleted document, a moved document,"
175                                + " an unregistered synchronization root nor a security update denying access to the current user.",
176                        entry::getDocPath, () -> docRef);
177                if (!session.exists(docRef)) {
178                    log.debug("Document {} ({}) doesn't exist, not adding entry to the change summary.",
179                            entry::getDocPath, () -> docRef);
180                    // Deleted or non accessible documents are mapped to
181                    // deleted file system items in a separate event: no need to
182                    // try to propagate this event.
183                    continue;
184                }
185                // Let's try to adapt the document as a FileSystemItem to
186                // provide it to the FileSystemItemChange entry.
187                change = getFileSystemItemChange(session, docRef, entry, null);
188                if (change == null) {
189                    // Non-adaptable documents are ignored
190                    log.debug(
191                            "Document {} ({}) is not adaptable as a FileSystemItem, not adding any entry to the change summary.",
192                            entry::getDocPath, () -> docRef);
193                } else {
194                    log.debug("Adding FileSystemItemChange entry to the change summary: {}", change);
195                    changes.add(change);
196                }
197            }
198        }
199        return changes;
200    }
201
202    /**
203     * Returns the last available log id in the audit log table (primary key) to be used as the upper bound of the event
204     * log id range clause in the change query.
205     */
206    @Override
207    @SuppressWarnings("unchecked")
208    public long getUpperBound() {
209        AuditReader auditService = Framework.getService(AuditReader.class);
210        String auditQuery = "from LogEntry log order by log.id desc";
211        log.debug("Querying audit log for greatest id: {}", auditQuery);
212
213        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1);
214        if (entries.isEmpty()) {
215            log.debug("Found no audit log entries, returning -1");
216            return -1;
217        }
218        return entries.get(0).getId();
219    }
220
221    @SuppressWarnings("unchecked")
222    protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots,
223            Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) {
224        AuditReader auditService = Framework.getService(AuditReader.class);
225        // Set fixed query parameters
226        Map<String, Object> params = new HashMap<>();
227        params.put("repositoryId", session.getRepositoryName());
228
229        // Build query and set dynamic parameters
230        StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where ");
231        auditQuerySb.append("log.repositoryId = :repositoryId");
232        auditQuerySb.append(" and ");
233        auditQuerySb.append("(");
234        if (!activeRoots.getPaths().isEmpty()) {
235            // detect changes under the currently active roots for the
236            // current user
237            auditQuerySb.append("(");
238            auditQuerySb.append("log.category = 'eventDocumentCategory'");
239            // TODO: don't hardcode event ids (contribute them?)
240            auditQuerySb.append(
241                    " and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked' or log.eventId = 'documentUntrashed')");
242            auditQuerySb.append(" or ");
243            auditQuerySb.append("log.category = 'eventLifeCycleCategory'");
244            auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ");
245            auditQuerySb.append(") and (");
246            auditQuerySb.append("(");
247            auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params));
248            auditQuerySb.append(")");
249            if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) {
250                auditQuerySb.append(" or (");
251                auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params));
252                auditQuerySb.append(")");
253            }
254            auditQuerySb.append(") or ");
255        }
256        // Detect any root (un-)registration changes for the roots previously
257        // seen by the current user.
258        // Exclude 'rootUnregistered' since root unregistration is covered by a
259        // "deleted" virtual event.
260        auditQuerySb.append("(");
261        auditQuerySb.append("log.category = '");
262        auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY);
263        auditQuerySb.append("' and log.eventId != 'rootUnregistered'");
264        auditQuerySb.append(")");
265        auditQuerySb.append(") and (");
266        auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, params));
267        // we intentionally sort by eventDate even if the range filtering is
268        // done on the log id: eventDate is useful to reflect the ordering of
269        // events occurring inside the same transaction while the
270        // monotonic behavior of log id is useful for ensuring that consecutive
271        // range queries to the audit won't miss any events even when long
272        // running transactions are logged after a delay.
273        auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc");
274        String auditQuery = auditQuerySb.toString();
275
276        log.debug("Querying audit log for changes: {} with params: {}", auditQuery, params);
277
278        List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit);
279
280        // Post filter the output to remove (un)registration that are unrelated
281        // to the current user.
282        List<LogEntry> postFilteredEntries = new ArrayList<>();
283        String principalName = session.getPrincipal().getName();
284        for (LogEntry entry : entries) {
285            ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName");
286            if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) {
287                // ignore event that only impact other users
288                continue;
289            }
290            log.debug("Change detected: {}", entry);
291            postFilteredEntries.add(entry);
292        }
293        return postFilteredEntries;
294    }
295
296    protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) {
297        StringBuilder rootPathClause = new StringBuilder();
298        int rootPathCount = 0;
299        for (String rootPath : rootPaths) {
300            rootPathCount++;
301            String rootPathParam = "rootPath" + rootPathCount;
302            if (rootPathClause.length() > 0) {
303                rootPathClause.append(" or ");
304            }
305            rootPathClause.append(String.format("log.docPath like :%s", rootPathParam));
306            params.put(rootPathParam, rootPath + '%');
307
308        }
309        return rootPathClause.toString();
310    }
311
312    protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds,
313            Map<String, Object> params) {
314        String paramName = "collectionMemberIds";
315        params.put(paramName, collectionSyncRootMemberIds);
316        return String.format("log.docUUID in (:%s)", paramName);
317    }
318
319    /**
320     * Using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826.
321     */
322    protected String getJPARangeClause(long lowerBound, long upperBound, Map<String, Object> params) {
323        params.put("lowerBound", lowerBound);
324        params.put("upperBound", upperBound);
325        return "log.id > :lowerBound and log.id <= :upperBound";
326    }
327
328    protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry,
329            String expectedFileSystemItemId) {
330        DocumentModel doc = session.getDocument(docRef);
331        // TODO: check the facet, last root change and list of roots
332        // to have a special handling for the roots.
333        FileSystemItem fsItem = null;
334        try {
335            // NXP-19442: Avoid useless and costly call to DocumentModel#getLockInfo
336            fsItem = Framework.getService(FileSystemItemAdapterService.class).getFileSystemItem(doc, false, false,
337                    false);
338        } catch (RootlessItemException e) {
339            // Can happen for an unregistered synchronization root that cannot
340            // be adapted as a FileSystemItem: nothing to do.
341            log.debug("RootlessItemException thrown while trying to adapt document {} ({}) as a FileSystemItem.",
342                    entry::getDocPath, () -> docRef);
343        }
344        if (fsItem == null) {
345            log.debug("Document {} ({}) is not adaptable as a FileSystemItem, returning null.", entry::getDocPath,
346                    () -> docRef);
347            return null;
348        }
349        if (expectedFileSystemItemId != null
350                && !fsItem.getId()
351                          .endsWith(AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) {
352            log.debug(
353                    "Id {} of FileSystemItem adapted from document {} ({}) doesn't match expected FileSystemItem id {}, returning null.",
354                    fsItem::getId, entry::getDocPath, () -> docRef, () -> expectedFileSystemItemId);
355            return null;
356        }
357        log.debug("Document {} ({}) is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.",
358                entry::getDocPath, () -> docRef);
359        // EventDate is able to reflect the ordering of the events
360        // inside a transaction (e.g. when several documents are
361        // created, updated, deleted at once) hence it's useful
362        // to pass that info to the client even though the change
363        // detection filtering is using the log id to have a
364        // guaranteed monotonic behavior that evenDate cannot
365        // guarantee when facing long transactions.
366        return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(), entry.getRepositoryId(),
367                entry.getDocUUID(), fsItem);
368    }
369
370}