001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Antoine Taillefer <ataillefer@nuxeo.com> 018 */ 019package org.nuxeo.drive.service.impl; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.drive.adapter.FileSystemItem; 031import org.nuxeo.drive.adapter.RootlessItemException; 032import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem; 033import org.nuxeo.drive.service.FileSystemChangeFinder; 034import org.nuxeo.drive.service.FileSystemItemAdapterService; 035import org.nuxeo.drive.service.FileSystemItemChange; 036import org.nuxeo.drive.service.NuxeoDriveEvents; 037import org.nuxeo.drive.service.NuxeoDriveManager; 038import org.nuxeo.drive.service.SynchronizationRoots; 039import org.nuxeo.drive.service.TooManyChangesException; 040import org.nuxeo.ecm.core.api.CoreSession; 041import org.nuxeo.ecm.core.api.DocumentModel; 042import org.nuxeo.ecm.core.api.DocumentRef; 043import org.nuxeo.ecm.core.api.IdRef; 044import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 045import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService; 046import org.nuxeo.ecm.platform.audit.api.AuditReader; 047import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 048import org.nuxeo.ecm.platform.audit.api.LogEntry; 049import org.nuxeo.runtime.api.Framework; 050 051/** 052 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}. 053 * 054 * @author Antoine Taillefer 055 */ 056public class AuditChangeFinder implements FileSystemChangeFinder { 057 058 private static final long serialVersionUID = 1963018967324857522L; 059 060 private static final Log log = LogFactory.getLog(AuditChangeFinder.class); 061 062 protected Map<String, String> parameters = new HashMap<String, String>(); 063 064 @Override 065 public void handleParameters(Map<String, String> parameters) { 066 this.parameters.putAll(parameters); 067 } 068 069 /** 070 * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping old method based on log date 071 * for backward compatibility. 072 * <p> 073 * Now using event log id for lower and upper bounds to ensure consistency. 074 * 075 * @see https://jira.nuxeo.com/browse/NXP-14826 076 * @see #getFileSystemChangesIntegerBounds(CoreSession, Set, SynchronizationRoots, Set, long, long, int) 077 */ 078 @Override 079 public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs, 080 SynchronizationRoots activeRoots, long lastSuccessfulSyncDate, long syncDate, int limit) 081 throws TooManyChangesException { 082 return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, null, lastSuccessfulSyncDate, syncDate, 083 false, limit); 084 } 085 086 @Override 087 public List<FileSystemItemChange> getFileSystemChangesIntegerBounds(CoreSession session, 088 Set<IdRef> lastActiveRootRefs, SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, 089 long lowerBound, long upperBound, int limit) throws TooManyChangesException { 090 return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, collectionSyncRootMemberIds, lowerBound, 091 upperBound, true, limit); 092 } 093 094 protected List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs, 095 SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, 096 boolean integerBounds, int limit) throws TooManyChangesException { 097 String principalName = session.getPrincipal().getName(); 098 List<FileSystemItemChange> changes = new ArrayList<FileSystemItemChange>(); 099 100 // Note: lastActiveRootRefs is not used: we could remove it from the 101 // public API 102 // and from the client as well but it might be useful to optimize future 103 // alternative implementations FileSystemChangeFinder component so it 104 // might 105 // be better to leave it part of the public API as currently. 106 107 // Find changes from the log under active roots or events that are 108 // linked to the un-registration or deletion of formerly synchronized 109 // roots 110 List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 111 upperBound, integerBounds, limit); 112 113 // First pass over the entries to check if a "NuxeoDrive" event has 114 // occurred during that period. 115 // This event can be: 116 // - a root registration 117 // - a root unregistration 118 // - a "deleted" transition 119 // - an "undeleted" transition 120 // - a removal 121 // - a move to an non synchronization root 122 // - a security update 123 // Thus the list of active roots may have changed and the cache might 124 // need to be invalidated: let's make sure we perform a 125 // query with the actual active roots. 126 for (LogEntry entry : entries) { 127 if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) { 128 if (log.isDebugEnabled()) { 129 log.debug(String.format("Detected sync root change for user '%s' in audit log:" 130 + " invalidating the root cache and refetching the changes.", principalName)); 131 } 132 NuxeoDriveManager driveManager = Framework.getService(NuxeoDriveManager.class); 133 driveManager.invalidateSynchronizationRootsCache(principalName); 134 driveManager.invalidateCollectionSyncRootMemberCache(principalName); 135 Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots( 136 session.getPrincipal()); 137 SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName()); 138 Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds( 139 session.getPrincipal()).get(session.getRepositoryName()); 140 entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds, lowerBound, 141 upperBound, integerBounds, limit); 142 break; 143 } 144 } 145 146 if (entries.size() >= limit) { 147 throw new TooManyChangesException("Too many changes found in the audit logs."); 148 } 149 for (LogEntry entry : entries) { 150 if (log.isDebugEnabled()) { 151 log.debug(String.format("Handling log entry %s", entry)); 152 } 153 FileSystemItemChange change = null; 154 DocumentRef docRef = new IdRef(entry.getDocUUID()); 155 ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId"); 156 if (fsIdInfo != null) { 157 // This document has been deleted, moved, is an unregistered synchronization root or its security has 158 // been updated, we just know the FileSystemItem id and name. 159 if (log.isDebugEnabled()) { 160 log.debug(String.format("Found extended info in audit log entry: document has been deleted, moved," 161 + " is an unregistered synchronization root or its security has been updated," 162 + " we just know the FileSystemItem id and name.")); 163 } 164 boolean isChangeSet = false; 165 // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry, 166 // only in the case of a move or a security update. 167 // This can succeed if this is a move to a synchronization root or a security update after which the 168 // current user still has access to the document. 169 if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) { 170 change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class)); 171 if (change != null) { 172 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 173 // A move to a synchronization root also fires a documentMoved event, don't propagate the 174 // virtual event. 175 if (log.isDebugEnabled()) { 176 log.debug(String.format( 177 "Document %s (%s) has been moved to another synchronzation root, not adding entry to the change summary.", 178 entry.getDocPath(), docRef)); 179 } 180 continue; 181 } 182 isChangeSet = true; 183 } 184 } 185 if (!isChangeSet) { 186 // If the document has been deleted, is a regular unregistered synchronization root, has been moved 187 // to a non synchronization root, if its security has been updated denying access to the current 188 // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the 189 // FileSystemItem id and name to the FileSystemItemChange entry. 190 if (log.isDebugEnabled()) { 191 log.debug(String.format( 192 "Document %s (%s) doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.", 193 entry.getDocPath(), docRef)); 194 } 195 String fsId = fsIdInfo.getValue(String.class); 196 String eventId; 197 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 198 // Move to a non synchronization root 199 eventId = NuxeoDriveEvents.DELETED_EVENT; 200 } else { 201 // Deletion, unregistration or security update 202 eventId = entry.getEventId(); 203 } 204 change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(), 205 entry.getRepositoryId(), entry.getDocUUID(), fsId, null); 206 } 207 if (log.isDebugEnabled()) { 208 log.debug(String.format("Adding FileSystemItemChange entry to the change summary: %s", change)); 209 } 210 changes.add(change); 211 } else { 212 // No extended info in the audit log entry, this should not be a deleted document, a moved document, an 213 // unregistered synchronization root nor a security update denying access to the current user. 214 if (log.isDebugEnabled()) { 215 log.debug(String.format( 216 "No extended info found in audit log entry: this is not a deleted document, a moved document," 217 + " an unregistered synchronization root nor a security update denying access to the current user.", 218 docRef)); 219 } 220 if (!session.exists(docRef)) { 221 if (log.isDebugEnabled()) { 222 log.debug( 223 String.format("Document %s (%s) doesn't exist, not adding entry to the change summary.", 224 entry.getDocPath(), docRef)); 225 } 226 // Deleted or non accessible documents are mapped to 227 // deleted file system items in a separate event: no need to 228 // try to propagate this event. 229 continue; 230 } 231 // Let's try to adapt the document as a FileSystemItem to 232 // provide it to the FileSystemItemChange entry. 233 change = getFileSystemItemChange(session, docRef, entry, null); 234 if (change == null) { 235 // Non-adaptable documents are ignored 236 if (log.isDebugEnabled()) { 237 log.debug(String.format( 238 "Document %s (%s) is not adaptable as a FileSystemItem, not adding any entry to the change summary.", 239 entry.getDocPath(), docRef)); 240 } 241 } else { 242 if (log.isDebugEnabled()) { 243 log.debug(String.format("Adding FileSystemItemChange entry to the change summary: %s", change)); 244 } 245 changes.add(change); 246 } 247 } 248 } 249 return changes; 250 } 251 252 /** 253 * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping for backward compatibility. 254 * <p> 255 * Return the current time to query the logDate field of the audit log. This time intentionally truncated to 0 256 * milliseconds to have a consistent behavior across databases. 257 * <p> 258 * Should now use last available log id in the audit log table as upper bound. 259 * 260 * @see https://jira.nuxeo.com/browse/NXP-14826 261 * @see #getUpperBound() 262 */ 263 @Override 264 public long getCurrentDate() { 265 long now = System.currentTimeMillis(); 266 return now - (now % 1000); 267 } 268 269 /** 270 * Return the last available log id in the audit log table (primary key) to be used as the upper bound of the event 271 * log id range clause in the change query. 272 */ 273 @Override 274 @SuppressWarnings("unchecked") 275 public long getUpperBound() { 276 AuditReader auditService = Framework.getService(AuditReader.class); 277 String auditQuery = "from LogEntry log order by log.id desc"; 278 if (log.isDebugEnabled()) { 279 log.debug("Querying audit log for greatest id: " + auditQuery); 280 } 281 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1); 282 if (entries.isEmpty()) { 283 if (log.isDebugEnabled()) { 284 log.debug("Found no audit log entries, returning -1"); 285 } 286 return -1; 287 } 288 return entries.get(0).getId(); 289 } 290 291 /** 292 * Returns the last available log id in the audit log table (primary key) considering events older than the last 293 * clustering invalidation date if clustering is enabled for at least one of the given repositories. This is to make 294 * sure the {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh. 295 */ 296 @Override 297 @SuppressWarnings("unchecked") 298 public long getUpperBound(Set<String> repositoryNames) { 299 long clusteringDelay = getClusteringDelay(repositoryNames); 300 AuditReader auditService = Framework.getService(AuditReader.class); 301 Map<String, Object> params = new HashMap<String, Object>(); 302 StringBuilder auditQuerySb = new StringBuilder("from LogEntry log"); 303 if (clusteringDelay > -1) { 304 // Double the delay in case of overlapping, see https://jira.nuxeo.com/browse/NXP-14826 305 long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay; 306 params.put("lastClusteringInvalidationDate", new Date(lastClusteringInvalidationDate)); 307 auditQuerySb.append(" where log.logDate < :lastClusteringInvalidationDate"); 308 } 309 auditQuerySb.append(" order by log.id desc"); 310 String auditQuery = auditQuerySb.toString(); 311 if (log.isDebugEnabled()) { 312 log.debug("Querying audit log for greatest id: " + auditQuery + " with params: " + params); 313 } 314 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, 1); 315 if (entries.isEmpty()) { 316 if (clusteringDelay > -1) { 317 // Check for existing entries without the clustering invalidation date filter to not return -1 in this 318 // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 319 // 0 320 List<LogEntry> allEntries = (List<LogEntry>) auditService.nativeQuery("from LogEntry", 1, 1); 321 if (!allEntries.isEmpty()) { 322 log.debug("Found no audit log entries matching the criterias but some exist, returning 0"); 323 return 0; 324 } 325 } 326 log.debug("Found no audit log entries, returning -1"); 327 return -1; 328 } 329 return entries.get(0).getId(); 330 } 331 332 /** 333 * Returns the longest clustering delay among the given repositories for which clustering is enabled. 334 */ 335 protected long getClusteringDelay(Set<String> repositoryNames) { 336 long clusteringDelay = -1; 337 SQLRepositoryService repositoryService = Framework.getService(SQLRepositoryService.class); 338 for (String repositoryName : repositoryNames) { 339 RepositoryDescriptor repositoryDescriptor = repositoryService.getRepositoryDescriptor(repositoryName); 340 if (repositoryDescriptor == null) { 341 // Not a VCS repository` 342 continue; 343 } 344 if (repositoryDescriptor.getClusteringEnabled()) { 345 clusteringDelay = Math.max(clusteringDelay, repositoryDescriptor.getClusteringDelay()); 346 } 347 } 348 return clusteringDelay; 349 } 350 351 @SuppressWarnings("unchecked") 352 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 353 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 354 int limit) { 355 AuditReader auditService = Framework.getService(AuditReader.class); 356 // Set fixed query parameters 357 Map<String, Object> params = new HashMap<String, Object>(); 358 params.put("repositoryId", session.getRepositoryName()); 359 360 // Build query and set dynamic parameters 361 StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where "); 362 auditQuerySb.append("log.repositoryId = :repositoryId"); 363 auditQuerySb.append(" and "); 364 auditQuerySb.append("("); 365 if (!activeRoots.getPaths().isEmpty()) { 366 // detect changes under the currently active roots for the 367 // current user 368 auditQuerySb.append("("); 369 auditQuerySb.append("log.category = 'eventDocumentCategory'"); 370 // TODO: don't hardcode event ids (contribute them?) 371 auditQuerySb.append( 372 " and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked')"); 373 auditQuerySb.append(" or "); 374 auditQuerySb.append("log.category = 'eventLifeCycleCategory'"); 375 auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' "); 376 auditQuerySb.append(") and ("); 377 auditQuerySb.append("("); 378 auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params)); 379 auditQuerySb.append(")"); 380 if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) { 381 auditQuerySb.append(" or ("); 382 auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params)); 383 auditQuerySb.append(")"); 384 } 385 auditQuerySb.append(") or "); 386 } 387 // Detect any root (un-)registration changes for the roots previously 388 // seen by the current user. 389 // Exclude 'rootUnregistered' since root unregistration is covered by a 390 // "deleted" virtual event. 391 auditQuerySb.append("("); 392 auditQuerySb.append("log.category = '"); 393 auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY); 394 auditQuerySb.append("' and log.eventId != 'rootUnregistered'"); 395 auditQuerySb.append(")"); 396 auditQuerySb.append(") and ("); 397 auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, integerBounds, params)); 398 // we intentionally sort by eventDate even if the range filtering is 399 // done on the log id: eventDate is useful to reflect the ordering of 400 // events occurring inside the same transaction while the 401 // monotonic behavior of log id is useful for ensuring that consecutive 402 // range queries to the audit won't miss any events even when long 403 // running transactions are logged after a delay. 404 auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc"); 405 String auditQuery = auditQuerySb.toString(); 406 407 if (log.isDebugEnabled()) { 408 log.debug("Querying audit log for changes: " + auditQuery + " with params: " + params); 409 } 410 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit); 411 412 // Post filter the output to remove (un)registration that are unrelated 413 // to the current user. 414 List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>(); 415 String principalName = session.getPrincipal().getName(); 416 for (LogEntry entry : entries) { 417 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 418 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 419 // ignore event that only impact other users 420 continue; 421 } 422 if (log.isDebugEnabled()) { 423 log.debug(String.format("Change detected: %s", entry)); 424 } 425 postFilteredEntries.add(entry); 426 } 427 return postFilteredEntries; 428 } 429 430 protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) { 431 StringBuilder rootPathClause = new StringBuilder(); 432 int rootPathCount = 0; 433 for (String rootPath : rootPaths) { 434 rootPathCount++; 435 String rootPathParam = "rootPath" + rootPathCount; 436 if (rootPathClause.length() > 0) { 437 rootPathClause.append(" or "); 438 } 439 rootPathClause.append(String.format("log.docPath like :%s", rootPathParam)); 440 params.put(rootPathParam, rootPath + '%'); 441 442 } 443 return rootPathClause.toString(); 444 } 445 446 protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds, 447 Map<String, Object> params) { 448 String paramName = "collectionMemberIds"; 449 params.put(paramName, collectionSyncRootMemberIds); 450 return String.format("log.docUUID in (:%s)", paramName); 451 } 452 453 /** 454 * Now using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826. 455 * <p> 456 * Keeping ability to use old clause based on log date for backward compatibility, to be deprecated. 457 */ 458 protected String getJPARangeClause(long lowerBound, long upperBound, boolean integerBounds, 459 Map<String, Object> params) { 460 if (integerBounds) { 461 params.put("lowerBound", lowerBound); 462 params.put("upperBound", upperBound); 463 return "log.id > :lowerBound and log.id <= :upperBound"; 464 } else { 465 params.put("lastSuccessfulSyncDate", new Date(lowerBound)); 466 params.put("syncDate", new Date(upperBound)); 467 return "log.logDate >= :lastSuccessfulSyncDate and log.logDate < :syncDate"; 468 } 469 } 470 471 protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry, 472 String expectedFileSystemItemId) { 473 DocumentModel doc = session.getDocument(docRef); 474 // TODO: check the facet, last root change and list of roots 475 // to have a special handling for the roots. 476 FileSystemItem fsItem = null; 477 try { 478 // NXP-19442: Avoid useless and costly call to DocumentModel#getLockInfo 479 fsItem = Framework.getService(FileSystemItemAdapterService.class).getFileSystemItem(doc, false, false, 480 false); 481 } catch (RootlessItemException e) { 482 // Can happen for an unregistered synchronization root that cannot 483 // be adapted as a FileSystemItem: nothing to do. 484 if (log.isDebugEnabled()) { 485 log.debug(String.format( 486 "RootlessItemException thrown while trying to adapt document %s (%s) as a FileSystemItem.", 487 entry.getDocPath(), docRef)); 488 } 489 } 490 if (fsItem == null) { 491 if (log.isDebugEnabled()) { 492 log.debug(String.format("Document %s (%s) is not adaptable as a FileSystemItem, returning null.", 493 entry.getDocPath(), docRef)); 494 } 495 return null; 496 } 497 if (expectedFileSystemItemId != null 498 && !fsItem.getId() 499 .endsWith(AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) { 500 if (log.isDebugEnabled()) { 501 log.debug(String.format( 502 "Id %s of FileSystemItem adapted from document %s (%s) doesn't match expected FileSystemItem id %s, returning null.", 503 fsItem.getId(), entry.getDocPath(), docRef, expectedFileSystemItemId)); 504 } 505 return null; 506 } 507 if (log.isDebugEnabled()) { 508 log.debug(String.format( 509 "Document %s (%s) is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.", 510 entry.getDocPath(), docRef)); 511 } 512 // EventDate is able to reflect the ordering of the events 513 // inside a transaction (e.g. when several documents are 514 // created, updated, deleted at once) hence it's useful 515 // to pass that info to the client even though the change 516 // detection filtering is using the log id to have a 517 // guaranteed monotonic behavior that evenDate cannot 518 // guarantee when facing long transactions. 519 return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(), entry.getRepositoryId(), 520 entry.getDocUUID(), fsItem); 521 } 522 523}