001/* 002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Antoine Taillefer <ataillefer@nuxeo.com> 018 */ 019package org.nuxeo.drive.service.impl; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026 027import org.apache.logging.log4j.LogManager; 028import org.apache.logging.log4j.Logger; 029import org.nuxeo.drive.adapter.FileSystemItem; 030import org.nuxeo.drive.adapter.RootlessItemException; 031import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem; 032import org.nuxeo.drive.service.FileSystemChangeFinder; 033import org.nuxeo.drive.service.FileSystemItemAdapterService; 034import org.nuxeo.drive.service.FileSystemItemChange; 035import org.nuxeo.drive.service.NuxeoDriveEvents; 036import org.nuxeo.drive.service.NuxeoDriveManager; 037import org.nuxeo.drive.service.SynchronizationRoots; 038import org.nuxeo.drive.service.TooManyChangesException; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.DocumentRef; 042import org.nuxeo.ecm.core.api.IdRef; 043import org.nuxeo.ecm.platform.audit.api.AuditReader; 044import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 045import org.nuxeo.ecm.platform.audit.api.LogEntry; 046import org.nuxeo.runtime.api.Framework; 047 048/** 049 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}. 050 * 051 * @author Antoine Taillefer 052 */ 053public class AuditChangeFinder implements FileSystemChangeFinder { 054 055 private static final Logger log = LogManager.getLogger(AuditChangeFinder.class); 056 057 protected Map<String, String> parameters = new HashMap<>(); 058 059 @Override 060 public void handleParameters(Map<String, String> parameters) { 061 this.parameters.putAll(parameters); 062 } 063 064 @Override 065 public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs, 066 SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, 067 int limit) { 068 String principalName = session.getPrincipal().getName(); 069 List<FileSystemItemChange> changes = new ArrayList<>(); 070 071 // Note: lastActiveRootRefs is not used: we could remove it from the 072 // public API 073 // and from the client as well but it might be useful to optimize future 074 // alternative implementations FileSystemChangeFinder component so it 075 // might 076 // be better to leave it part of the public API as currently. 077 078 // Find changes from the log under active roots or events that are 079 // linked to the un-registration or deletion of formerly synchronized 080 // roots 081 List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 082 upperBound, limit); 083 084 // First pass over the entries to check if a "NuxeoDrive" event has 085 // occurred during that period. 086 // This event can be: 087 // - a root registration 088 // - a root unregistration 089 // - a "deleted" transition / documentTrashed event 090 // - an "undeleted" transition 091 // - a removal 092 // - a move to an non synchronization root 093 // - a security update 094 // Thus the list of active roots may have changed and the cache might 095 // need to be invalidated: let's make sure we perform a 096 // query with the actual active roots. 097 for (LogEntry entry : entries) { 098 if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) { 099 log.debug("Detected sync root change for user '{}' in audit log:" 100 + " invalidating the root cache and refetching the changes.", principalName); 101 NuxeoDriveManager driveManager = Framework.getService(NuxeoDriveManager.class); 102 driveManager.invalidateSynchronizationRootsCache(principalName); 103 driveManager.invalidateCollectionSyncRootMemberCache(principalName); 104 Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots( 105 session.getPrincipal()); 106 SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName()); 107 Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds( 108 session.getPrincipal()).get(session.getRepositoryName()); 109 entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds, lowerBound, 110 upperBound, limit); 111 break; 112 } 113 } 114 115 if (entries.size() >= limit) { 116 throw new TooManyChangesException("Too many changes found in the audit logs."); 117 } 118 for (LogEntry entry : entries) { 119 log.debug("Handling log entry {}", entry); 120 FileSystemItemChange change = null; 121 DocumentRef docRef = new IdRef(entry.getDocUUID()); 122 ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId"); 123 if (fsIdInfo != null) { 124 // This document has been deleted, moved, is an unregistered synchronization root or its security has 125 // been updated, we just know the FileSystemItem id and name. 126 log.debug("Found extended info in audit log entry: document has been deleted, moved," 127 + " is an unregistered synchronization root or its security has been updated," 128 + " we just know the FileSystemItem id and name."); 129 boolean isChangeSet = false; 130 // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry, 131 // only in the case of a move or a security update. 132 // This can succeed if this is a move to a synchronization root or a security update after which the 133 // current user still has access to the document. 134 if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) { 135 change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class)); 136 if (change != null) { 137 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 138 // A move to a synchronization root also fires a documentMoved event, don't propagate the 139 // virtual event. 140 log.debug( 141 "Document {} ({}) has been moved to another synchronzation root, not adding entry to the change summary.", 142 entry::getDocPath, () -> docRef); 143 continue; 144 } 145 isChangeSet = true; 146 } 147 } 148 if (!isChangeSet) { 149 // If the document has been deleted, is a regular unregistered synchronization root, has been moved 150 // to a non synchronization root, if its security has been updated denying access to the current 151 // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the 152 // FileSystemItem id and name to the FileSystemItemChange entry. 153 log.debug( 154 "Document {} ({}) doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.", 155 entry::getDocPath, () -> docRef); 156 String fsId = fsIdInfo.getValue(String.class); 157 String eventId; 158 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 159 // Move to a non synchronization root 160 eventId = NuxeoDriveEvents.DELETED_EVENT; 161 } else { 162 // Deletion, unregistration or security update 163 eventId = entry.getEventId(); 164 } 165 change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(), 166 entry.getRepositoryId(), entry.getDocUUID(), fsId, null); 167 } 168 log.debug("Adding FileSystemItemChange entry to the change summary: {}", change); 169 changes.add(change); 170 } else { 171 // No extended info in the audit log entry, this should not be a deleted document, a moved document, an 172 // unregistered synchronization root nor a security update denying access to the current user. 173 log.debug( 174 "No extended info found in audit log entry {} ({}): this is not a deleted document, a moved document," 175 + " an unregistered synchronization root nor a security update denying access to the current user.", 176 entry::getDocPath, () -> docRef); 177 if (!session.exists(docRef)) { 178 log.debug("Document {} ({}) doesn't exist, not adding entry to the change summary.", 179 entry::getDocPath, () -> docRef); 180 // Deleted or non accessible documents are mapped to 181 // deleted file system items in a separate event: no need to 182 // try to propagate this event. 183 continue; 184 } 185 // Let's try to adapt the document as a FileSystemItem to 186 // provide it to the FileSystemItemChange entry. 187 change = getFileSystemItemChange(session, docRef, entry, null); 188 if (change == null) { 189 // Non-adaptable documents are ignored 190 log.debug( 191 "Document {} ({}) is not adaptable as a FileSystemItem, not adding any entry to the change summary.", 192 entry::getDocPath, () -> docRef); 193 } else { 194 log.debug("Adding FileSystemItemChange entry to the change summary: {}", change); 195 changes.add(change); 196 } 197 } 198 } 199 return changes; 200 } 201 202 /** 203 * Returns the last available log id in the audit log table (primary key) to be used as the upper bound of the event 204 * log id range clause in the change query. 205 */ 206 @Override 207 @SuppressWarnings("unchecked") 208 public long getUpperBound() { 209 AuditReader auditService = Framework.getService(AuditReader.class); 210 String auditQuery = "from LogEntry log order by log.id desc"; 211 log.debug("Querying audit log for greatest id: {}", auditQuery); 212 213 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1); 214 if (entries.isEmpty()) { 215 log.debug("Found no audit log entries, returning -1"); 216 return -1; 217 } 218 return entries.get(0).getId(); 219 } 220 221 @SuppressWarnings("unchecked") 222 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 223 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) { 224 AuditReader auditService = Framework.getService(AuditReader.class); 225 // Set fixed query parameters 226 Map<String, Object> params = new HashMap<>(); 227 params.put("repositoryId", session.getRepositoryName()); 228 229 // Build query and set dynamic parameters 230 StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where "); 231 auditQuerySb.append("log.repositoryId = :repositoryId"); 232 auditQuerySb.append(" and "); 233 auditQuerySb.append("("); 234 if (!activeRoots.getPaths().isEmpty()) { 235 // detect changes under the currently active roots for the 236 // current user 237 auditQuerySb.append("("); 238 auditQuerySb.append("log.category = 'eventDocumentCategory'"); 239 // TODO: don't hardcode event ids (contribute them?) 240 auditQuerySb.append( 241 " and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked' or log.eventId = 'documentUntrashed')"); 242 auditQuerySb.append(" or "); 243 auditQuerySb.append("log.category = 'eventLifeCycleCategory'"); 244 auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' "); 245 auditQuerySb.append(") and ("); 246 auditQuerySb.append("("); 247 auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params)); 248 auditQuerySb.append(")"); 249 if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) { 250 auditQuerySb.append(" or ("); 251 auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params)); 252 auditQuerySb.append(")"); 253 } 254 auditQuerySb.append(") or "); 255 } 256 // Detect any root (un-)registration changes for the roots previously 257 // seen by the current user. 258 // Exclude 'rootUnregistered' since root unregistration is covered by a 259 // "deleted" virtual event. 260 auditQuerySb.append("("); 261 auditQuerySb.append("log.category = '"); 262 auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY); 263 auditQuerySb.append("' and log.eventId != 'rootUnregistered'"); 264 auditQuerySb.append(")"); 265 auditQuerySb.append(") and ("); 266 auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, params)); 267 // we intentionally sort by eventDate even if the range filtering is 268 // done on the log id: eventDate is useful to reflect the ordering of 269 // events occurring inside the same transaction while the 270 // monotonic behavior of log id is useful for ensuring that consecutive 271 // range queries to the audit won't miss any events even when long 272 // running transactions are logged after a delay. 273 auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc"); 274 String auditQuery = auditQuerySb.toString(); 275 276 log.debug("Querying audit log for changes: {} with params: {}", auditQuery, params); 277 278 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit); 279 280 // Post filter the output to remove (un)registration that are unrelated 281 // to the current user. 282 List<LogEntry> postFilteredEntries = new ArrayList<>(); 283 String principalName = session.getPrincipal().getName(); 284 for (LogEntry entry : entries) { 285 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 286 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 287 // ignore event that only impact other users 288 continue; 289 } 290 log.debug("Change detected: {}", entry); 291 postFilteredEntries.add(entry); 292 } 293 return postFilteredEntries; 294 } 295 296 protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) { 297 StringBuilder rootPathClause = new StringBuilder(); 298 int rootPathCount = 0; 299 for (String rootPath : rootPaths) { 300 rootPathCount++; 301 String rootPathParam = "rootPath" + rootPathCount; 302 if (rootPathClause.length() > 0) { 303 rootPathClause.append(" or "); 304 } 305 rootPathClause.append(String.format("log.docPath like :%s", rootPathParam)); 306 params.put(rootPathParam, rootPath + '%'); 307 308 } 309 return rootPathClause.toString(); 310 } 311 312 protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds, 313 Map<String, Object> params) { 314 String paramName = "collectionMemberIds"; 315 params.put(paramName, collectionSyncRootMemberIds); 316 return String.format("log.docUUID in (:%s)", paramName); 317 } 318 319 /** 320 * Using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826. 321 */ 322 protected String getJPARangeClause(long lowerBound, long upperBound, Map<String, Object> params) { 323 params.put("lowerBound", lowerBound); 324 params.put("upperBound", upperBound); 325 return "log.id > :lowerBound and log.id <= :upperBound"; 326 } 327 328 protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry, 329 String expectedFileSystemItemId) { 330 DocumentModel doc = session.getDocument(docRef); 331 // TODO: check the facet, last root change and list of roots 332 // to have a special handling for the roots. 333 FileSystemItem fsItem = null; 334 try { 335 // NXP-19442: Avoid useless and costly call to DocumentModel#getLockInfo 336 fsItem = Framework.getService(FileSystemItemAdapterService.class).getFileSystemItem(doc, false, false, 337 false); 338 } catch (RootlessItemException e) { 339 // Can happen for an unregistered synchronization root that cannot 340 // be adapted as a FileSystemItem: nothing to do. 341 log.debug("RootlessItemException thrown while trying to adapt document {} ({}) as a FileSystemItem.", 342 entry::getDocPath, () -> docRef); 343 } 344 if (fsItem == null) { 345 log.debug("Document {} ({}) is not adaptable as a FileSystemItem, returning null.", entry::getDocPath, 346 () -> docRef); 347 return null; 348 } 349 if (expectedFileSystemItemId != null 350 && !fsItem.getId() 351 .endsWith(AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) { 352 log.debug( 353 "Id {} of FileSystemItem adapted from document {} ({}) doesn't match expected FileSystemItem id {}, returning null.", 354 fsItem::getId, entry::getDocPath, () -> docRef, () -> expectedFileSystemItemId); 355 return null; 356 } 357 log.debug("Document {} ({}) is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.", 358 entry::getDocPath, () -> docRef); 359 // EventDate is able to reflect the ordering of the events 360 // inside a transaction (e.g. when several documents are 361 // created, updated, deleted at once) hence it's useful 362 // to pass that info to the client even though the change 363 // detection filtering is using the log id to have a 364 // guaranteed monotonic behavior that evenDate cannot 365 // guarantee when facing long transactions. 366 return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(), entry.getRepositoryId(), 367 entry.getDocUUID(), fsItem); 368 } 369 370}