001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Antoine Taillefer <ataillefer@nuxeo.com> 018 */ 019package org.nuxeo.drive.service.impl; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.drive.adapter.FileSystemItem; 031import org.nuxeo.drive.adapter.RootlessItemException; 032import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem; 033import org.nuxeo.drive.service.FileSystemChangeFinder; 034import org.nuxeo.drive.service.FileSystemItemAdapterService; 035import org.nuxeo.drive.service.FileSystemItemChange; 036import org.nuxeo.drive.service.NuxeoDriveEvents; 037import org.nuxeo.drive.service.NuxeoDriveManager; 038import org.nuxeo.drive.service.SynchronizationRoots; 039import org.nuxeo.drive.service.TooManyChangesException; 040import org.nuxeo.ecm.core.api.CoreSession; 041import org.nuxeo.ecm.core.api.DocumentModel; 042import org.nuxeo.ecm.core.api.DocumentRef; 043import org.nuxeo.ecm.core.api.IdRef; 044import org.nuxeo.ecm.platform.audit.api.AuditReader; 045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 046import org.nuxeo.ecm.platform.audit.api.LogEntry; 047import org.nuxeo.runtime.api.Framework; 048 049/** 050 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}. 051 * 052 * @author Antoine Taillefer 053 */ 054public class AuditChangeFinder implements FileSystemChangeFinder { 055 056 private static final long serialVersionUID = 1963018967324857522L; 057 058 private static final Log log = LogFactory.getLog(AuditChangeFinder.class); 059 060 protected Map<String, String> parameters = new HashMap<String, String>(); 061 062 @Override 063 public void handleParameters(Map<String, String> parameters) { 064 this.parameters.putAll(parameters); 065 } 066 067 /** 068 * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping old method based on log date 069 * for backward compatibility. 070 * <p> 071 * Now using event log id for lower and upper bounds to ensure consistency. 072 * 073 * @see https://jira.nuxeo.com/browse/NXP-14826 074 * @see #getFileSystemChangesIntegerBounds(CoreSession, Set, SynchronizationRoots, Set, long, long, int) 075 */ 076 @Override 077 public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs, 078 SynchronizationRoots activeRoots, long lastSuccessfulSyncDate, long syncDate, int limit) 079 throws TooManyChangesException { 080 return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, null, lastSuccessfulSyncDate, syncDate, 081 false, limit); 082 } 083 084 @Override 085 public List<FileSystemItemChange> getFileSystemChangesIntegerBounds(CoreSession session, 086 Set<IdRef> lastActiveRootRefs, SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, 087 long lowerBound, long upperBound, int limit) throws TooManyChangesException { 088 return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, collectionSyncRootMemberIds, lowerBound, 089 upperBound, true, limit); 090 } 091 092 protected List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs, 093 SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound, 094 long upperBound, boolean integerBounds, int limit) throws TooManyChangesException { 095 String principalName = session.getPrincipal().getName(); 096 List<FileSystemItemChange> changes = new ArrayList<FileSystemItemChange>(); 097 098 // Note: lastActiveRootRefs is not used: we could remove it from the 099 // public API 100 // and from the client as well but it might be useful to optimize future 101 // alternative implementations FileSystemChangeFinder component so it 102 // might 103 // be better to leave it part of the public API as currently. 104 105 // Find changes from the log under active roots or events that are 106 // linked to the un-registration or deletion of formerly synchronized 107 // roots 108 List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 109 upperBound, integerBounds, limit); 110 111 // First pass over the entries to check if a "NuxeoDrive" event has 112 // occurred during that period. 113 // This event can be: 114 // - a root registration 115 // - a root unregistration 116 // - a "deleted" transition 117 // - an "undeleted" transition 118 // - a removal 119 // - a move to an non synchronization root 120 // - a security update 121 // Thus the list of active roots may have changed and the cache might 122 // need to be invalidated: let's make sure we perform a 123 // query with the actual active roots. 124 for (LogEntry entry : entries) { 125 if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) { 126 if (log.isDebugEnabled()) { 127 log.debug(String.format("Detected sync root change for user '%s' in audit log:" 128 + " invalidating the root cache and refetching the changes.", principalName)); 129 } 130 NuxeoDriveManager driveManager = Framework.getLocalService(NuxeoDriveManager.class); 131 driveManager.invalidateSynchronizationRootsCache(principalName); 132 driveManager.invalidateCollectionSyncRootMemberCache(principalName); 133 Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots(session.getPrincipal()); 134 SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName()); 135 Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds( 136 session.getPrincipal()).get(session.getRepositoryName()); 137 entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds, 138 lowerBound, upperBound, integerBounds, limit); 139 break; 140 } 141 } 142 143 if (entries.size() >= limit) { 144 throw new TooManyChangesException("Too many changes found in the audit logs."); 145 } 146 for (LogEntry entry : entries) { 147 FileSystemItemChange change = null; 148 DocumentRef docRef = new IdRef(entry.getDocUUID()); 149 ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId"); 150 if (fsIdInfo != null) { 151 // This document has been deleted, moved, is an unregistered synchronization root or its security has 152 // been updated, we just know the FileSystemItem id and name. 153 if (log.isDebugEnabled()) { 154 log.debug(String.format( 155 "Found extended info in audit log entry, document %s has been deleted or is an unregistered synchronization root.", 156 docRef)); 157 } 158 boolean isChangeSet = false; 159 // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry, 160 // only in the case of a move or a security update. 161 // This can succeed if this is a move to a synchronization root or a security update after which the 162 // current user still has access to the document. 163 if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) { 164 change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class)); 165 if (change != null) { 166 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 167 // A move to a synchronization root also fires a documentMoved event, don't propagate the 168 // virtual event. 169 if (log.isDebugEnabled()) { 170 log.debug(String.format( 171 "Document %s has been moved to another synchronzation root, not adding entry to the change summary.", 172 docRef)); 173 } 174 continue; 175 } 176 isChangeSet = true; 177 } 178 } 179 if (!isChangeSet) { 180 // If the document has been deleted, is a regular unregistered synchronization root, has been moved 181 // to a non synchronization root, if its security has been updated denying access to the current 182 // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the 183 // FileSystemItem id and name to the FileSystemItemChange entry. 184 if (log.isDebugEnabled()) { 185 log.debug(String.format( 186 "Document %s doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.", 187 docRef)); 188 } 189 String fsId = fsIdInfo.getValue(String.class); 190 String fsName = entry.getExtendedInfos().get("fileSystemItemName").getValue(String.class); 191 String eventId; 192 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 193 // Move to a non synchronization root 194 eventId = NuxeoDriveEvents.DELETED_EVENT; 195 } else { 196 // Deletion, unregistration or security update 197 eventId = entry.getEventId(); 198 } 199 change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(), 200 entry.getRepositoryId(), entry.getDocUUID(), fsId, fsName); 201 } 202 if (log.isDebugEnabled()) { 203 log.debug(String.format("Adding FileSystemItemChange entry for document %s to the change summary.", 204 docRef)); 205 } 206 changes.add(change); 207 } else { 208 // No extended info in the audit log entry, this should not be a 209 // deleted document, nor an unregistered synchronization root 210 // nor a security update denying access to the current user. 211 if (log.isDebugEnabled()) { 212 log.debug(String.format( 213 "No extended info found in audit log entry, document %s has not been deleted nor is an unregistered synchronization root.", 214 docRef)); 215 } 216 if (!session.exists(docRef)) { 217 if (log.isDebugEnabled()) { 218 log.debug(String.format("Document %s doesn't exist, not adding entry to the change summary.", 219 docRef)); 220 } 221 // Deleted or non accessible documents are mapped to 222 // deleted file system items in a separate event: no need to 223 // try to propagate this event. 224 continue; 225 } 226 // Let's try to adapt the document as a FileSystemItem to 227 // provide it to the FileSystemItemChange entry. 228 change = getFileSystemItemChange(session, docRef, entry, null); 229 if (change == null) { 230 // Non-adaptable documents are ignored 231 if (log.isDebugEnabled()) { 232 log.debug(String.format( 233 "Document %s is not adaptable as a FileSystemItem, not adding any entry to the change summary.", 234 docRef)); 235 } 236 } else { 237 if (log.isDebugEnabled()) { 238 log.debug(String.format( 239 "Adding FileSystemItemChange entry for document %s to the change summary.", docRef)); 240 } 241 changes.add(change); 242 } 243 } 244 } 245 return changes; 246 } 247 248 /** 249 * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping for backward compatibility. 250 * <p> 251 * Return the current time to query the logDate field of the audit log. This time intentionally truncated to 0 252 * milliseconds to have a consistent behavior across databases. 253 * <p> 254 * Should now use last available log id in the audit log table as upper bound. 255 * 256 * @see https://jira.nuxeo.com/browse/NXP-14826 257 * @see #getUpperBound() 258 */ 259 @Override 260 public long getCurrentDate() { 261 long now = System.currentTimeMillis(); 262 return now - (now % 1000); 263 } 264 265 /** 266 * Return the last available log id in the audit log table (primary key) to be used as the upper bound of the event 267 * log id range clause in the change query. 268 */ 269 @Override 270 @SuppressWarnings("unchecked") 271 public long getUpperBound() { 272 AuditReader auditService = Framework.getService(AuditReader.class); 273 String auditQuery = "from LogEntry log order by log.id desc"; 274 if (log.isDebugEnabled()) { 275 if (log.isDebugEnabled()) { 276 log.debug("Querying audit log for greatest id: " + auditQuery); 277 } 278 } 279 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1); 280 if (entries.isEmpty()) { 281 if (log.isDebugEnabled()) { 282 log.debug("Found no audit log entries, returning -1"); 283 } 284 return -1; 285 } 286 return entries.get(0).getId(); 287 } 288 289 @SuppressWarnings("unchecked") 290 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 291 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 292 AuditReader auditService = Framework.getLocalService(AuditReader.class); 293 // Set fixed query parameters 294 Map<String, Object> params = new HashMap<String, Object>(); 295 params.put("repositoryId", session.getRepositoryName()); 296 297 // Build query and set dynamic parameters 298 StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where "); 299 auditQuerySb.append("log.repositoryId = :repositoryId"); 300 auditQuerySb.append(" and "); 301 auditQuerySb.append("("); 302 if (!activeRoots.getPaths().isEmpty()) { 303 // detect changes under the currently active roots for the 304 // current user 305 auditQuerySb.append("("); 306 auditQuerySb.append("log.category = 'eventDocumentCategory'"); 307 // TODO: don't hardcode event ids (contribute them?) 308 auditQuerySb.append(" and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked')"); 309 auditQuerySb.append(" or "); 310 auditQuerySb.append("log.category = 'eventLifeCycleCategory'"); 311 auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' "); 312 auditQuerySb.append(") and ("); 313 auditQuerySb.append("("); 314 auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params)); 315 auditQuerySb.append(")"); 316 if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) { 317 auditQuerySb.append(" or ("); 318 auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params)); 319 auditQuerySb.append(")"); 320 } 321 auditQuerySb.append(") or "); 322 } 323 // Detect any root (un-)registration changes for the roots previously 324 // seen by the current user. 325 // Exclude 'rootUnregistered' since root unregistration is covered by a 326 // "deleted" virtual event. 327 auditQuerySb.append("("); 328 auditQuerySb.append("log.category = '"); 329 auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY); 330 auditQuerySb.append("' and log.eventId != 'rootUnregistered'"); 331 auditQuerySb.append(")"); 332 auditQuerySb.append(") and ("); 333 auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, integerBounds, params)); 334 // we intentionally sort by eventDate even if the range filtering is 335 // done on the log id: eventDate is useful to reflect the ordering of 336 // events occurring inside the same transaction while the 337 // monotonic behavior of log id is useful for ensuring that consecutive 338 // range queries to the audit won't miss any events even when long 339 // running transactions are logged after a delay. 340 auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc"); 341 String auditQuery = auditQuerySb.toString(); 342 343 if (log.isDebugEnabled()) { 344 if (log.isDebugEnabled()) { 345 log.debug("Querying audit log for changes: " + auditQuery + " with params: " + params); 346 } 347 } 348 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit); 349 350 // Post filter the output to remove (un)registration that are unrelated 351 // to the current user. 352 List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>(); 353 String principalName = session.getPrincipal().getName(); 354 for (LogEntry entry : entries) { 355 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 356 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 357 // ignore event that only impact other users 358 continue; 359 } 360 if (log.isDebugEnabled()) { 361 if (log.isDebugEnabled()) { 362 log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s", 363 entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(), 364 entry.getDocPath())); 365 } 366 } 367 postFilteredEntries.add(entry); 368 } 369 return postFilteredEntries; 370 } 371 372 protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) { 373 StringBuilder rootPathClause = new StringBuilder(); 374 int rootPathCount = 0; 375 for (String rootPath : rootPaths) { 376 rootPathCount++; 377 String rootPathParam = "rootPath" + rootPathCount; 378 if (rootPathClause.length() > 0) { 379 rootPathClause.append(" or "); 380 } 381 rootPathClause.append(String.format("log.docPath like :%s", rootPathParam)); 382 params.put(rootPathParam, rootPath + '%'); 383 384 } 385 return rootPathClause.toString(); 386 } 387 388 protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds, 389 Map<String, Object> params) { 390 String paramName = "collectionMemberIds"; 391 params.put(paramName, collectionSyncRootMemberIds); 392 return String.format("log.docUUID in (:%s)", paramName); 393 } 394 395 /** 396 * Now using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826. 397 * <p> 398 * Keeping ability to use old clause based on log date for backward compatibility, to be deprecated. 399 */ 400 protected String getJPARangeClause(long lowerBound, long upperBound, boolean integerBounds, 401 Map<String, Object> params) { 402 if (integerBounds) { 403 params.put("lowerBound", lowerBound); 404 params.put("upperBound", upperBound); 405 return "log.id > :lowerBound and log.id <= :upperBound"; 406 } else { 407 params.put("lastSuccessfulSyncDate", new Date(lowerBound)); 408 params.put("syncDate", new Date(upperBound)); 409 return "log.logDate >= :lastSuccessfulSyncDate and log.logDate < :syncDate"; 410 } 411 } 412 413 protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry, 414 String expectedFileSystemItemId) { 415 DocumentModel doc = session.getDocument(docRef); 416 // TODO: check the facet, last root change and list of roots 417 // to have a special handling for the roots. 418 FileSystemItem fsItem = null; 419 try { 420 fsItem = Framework.getLocalService(FileSystemItemAdapterService.class).getFileSystemItem(doc); 421 } catch (RootlessItemException e) { 422 // Can happen for an unregistered synchronization root that cannot 423 // be adapted as a FileSystemItem: nothing to do. 424 if (log.isDebugEnabled()) { 425 log.debug(String.format( 426 "RootlessItemException thrown while trying to adapt document %s as a FileSystemItem.", docRef)); 427 } 428 } 429 if (fsItem == null) { 430 if (log.isDebugEnabled()) { 431 log.debug(String.format("Document %s is not adaptable as a FileSystemItem, returning null.", docRef)); 432 } 433 return null; 434 } 435 if (expectedFileSystemItemId != null 436 && !fsItem.getId().endsWith( 437 AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) { 438 if (log.isDebugEnabled()) { 439 log.debug(String.format( 440 "Id %s of FileSystemItem adapted from document %s doesn't match expected FileSystemItem id %s, returning null.", 441 fsItem.getId(), docRef, expectedFileSystemItemId)); 442 } 443 return null; 444 } 445 if (log.isDebugEnabled()) { 446 log.debug(String.format( 447 "Document %s is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.", 448 docRef)); 449 } 450 // EventDate is able to reflect the ordering of the events 451 // inside a transaction (e.g. when several documents are 452 // created, updated, deleted at once) hence it's useful 453 // to pass that info to the client even though the change 454 // detection filtering is using the log id to have a 455 // guaranteed monotonic behavior that evenDate cannot 456 // guarantee when facing long transactions. 457 return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(), 458 entry.getRepositoryId(), entry.getDocUUID(), fsItem); 459 } 460 461}