001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Antoine Taillefer <ataillefer@nuxeo.com> 016 */ 017package org.nuxeo.drive.service.impl; 018 019import java.util.ArrayList; 020import java.util.Date; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.drive.adapter.FileSystemItem; 029import org.nuxeo.drive.adapter.RootlessItemException; 030import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem; 031import org.nuxeo.drive.service.FileSystemChangeFinder; 032import org.nuxeo.drive.service.FileSystemItemAdapterService; 033import org.nuxeo.drive.service.FileSystemItemChange; 034import org.nuxeo.drive.service.NuxeoDriveEvents; 035import org.nuxeo.drive.service.NuxeoDriveManager; 036import org.nuxeo.drive.service.SynchronizationRoots; 037import org.nuxeo.drive.service.TooManyChangesException; 038import org.nuxeo.ecm.core.api.CoreSession; 039import org.nuxeo.ecm.core.api.DocumentModel; 040import org.nuxeo.ecm.core.api.DocumentRef; 041import org.nuxeo.ecm.core.api.IdRef; 042import org.nuxeo.ecm.platform.audit.api.AuditReader; 043import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 044import org.nuxeo.ecm.platform.audit.api.LogEntry; 045import org.nuxeo.runtime.api.Framework; 046 047/** 048 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}. 049 * 050 * @author Antoine Taillefer 051 */ 052public class AuditChangeFinder implements FileSystemChangeFinder { 053 054 private static final long serialVersionUID = 1963018967324857522L; 055 056 private static final Log log = LogFactory.getLog(AuditChangeFinder.class); 057 058 protected Map<String, String> parameters = new HashMap<String, String>(); 059 060 @Override 061 public void handleParameters(Map<String, String> parameters) { 062 this.parameters.putAll(parameters); 063 } 064 065 /** 066 * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping old method based on log date 067 * for backward compatibility. 068 * <p> 069 * Now using event log id for lower and upper bounds to ensure consistency. 070 * 071 * @see https://jira.nuxeo.com/browse/NXP-14826 072 * @see #getFileSystemChangesIntegerBounds(CoreSession, Set, SynchronizationRoots, Set, long, long, int) 073 */ 074 @Override 075 public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs, 076 SynchronizationRoots activeRoots, long lastSuccessfulSyncDate, long syncDate, int limit) 077 throws TooManyChangesException { 078 return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, null, lastSuccessfulSyncDate, syncDate, 079 false, limit); 080 } 081 082 @Override 083 public List<FileSystemItemChange> getFileSystemChangesIntegerBounds(CoreSession session, 084 Set<IdRef> lastActiveRootRefs, SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, 085 long lowerBound, long upperBound, int limit) throws TooManyChangesException { 086 return getFileSystemChanges(session, lastActiveRootRefs, activeRoots, collectionSyncRootMemberIds, lowerBound, 087 upperBound, true, limit); 088 } 089 090 protected List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs, 091 SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound, 092 long upperBound, boolean integerBounds, int limit) throws TooManyChangesException { 093 String principalName = session.getPrincipal().getName(); 094 List<FileSystemItemChange> changes = new ArrayList<FileSystemItemChange>(); 095 096 // Note: lastActiveRootRefs is not used: we could remove it from the 097 // public API 098 // and from the client as well but it might be useful to optimize future 099 // alternative implementations FileSystemChangeFinder component so it 100 // might 101 // be better to leave it part of the public API as currently. 102 103 // Find changes from the log under active roots or events that are 104 // linked to the un-registration or deletion of formerly synchronized 105 // roots 106 List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 107 upperBound, integerBounds, limit); 108 109 // First pass over the entries to check if a "NuxeoDrive" event has 110 // occurred during that period. 111 // This event can be: 112 // - a root registration 113 // - a root unregistration 114 // - a "deleted" transition 115 // - an "undeleted" transition 116 // - a removal 117 // - a move to an non synchronization root 118 // - a security update 119 // Thus the list of active roots may have changed and the cache might 120 // need to be invalidated: let's make sure we perform a 121 // query with the actual active roots. 122 for (LogEntry entry : entries) { 123 if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) { 124 if (log.isDebugEnabled()) { 125 log.debug(String.format("Detected sync root change for user '%s' in audit log:" 126 + " invalidating the root cache and refetching the changes.", principalName)); 127 } 128 NuxeoDriveManager driveManager = Framework.getLocalService(NuxeoDriveManager.class); 129 driveManager.invalidateSynchronizationRootsCache(principalName); 130 driveManager.invalidateCollectionSyncRootMemberCache(principalName); 131 Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots(session.getPrincipal()); 132 SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName()); 133 Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds( 134 session.getPrincipal()).get(session.getRepositoryName()); 135 entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds, 136 lowerBound, upperBound, integerBounds, limit); 137 break; 138 } 139 } 140 141 if (entries.size() >= limit) { 142 throw new TooManyChangesException("Too many changes found in the audit logs."); 143 } 144 for (LogEntry entry : entries) { 145 FileSystemItemChange change = null; 146 DocumentRef docRef = new IdRef(entry.getDocUUID()); 147 ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId"); 148 if (fsIdInfo != null) { 149 // This document has been deleted, moved, is an unregistered synchronization root or its security has 150 // been updated, we just know the FileSystemItem id and name. 151 if (log.isDebugEnabled()) { 152 log.debug(String.format( 153 "Found extended info in audit log entry, document %s has been deleted or is an unregistered synchronization root.", 154 docRef)); 155 } 156 boolean isChangeSet = false; 157 // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry, 158 // only in the case of a move or a security update. 159 // This can succeed if this is a move to a synchronization root or a security update after which the 160 // current user still has access to the document. 161 if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) { 162 change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class)); 163 if (change != null) { 164 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 165 // A move to a synchronization root also fires a documentMoved event, don't propagate the 166 // virtual event. 167 if (log.isDebugEnabled()) { 168 log.debug(String.format( 169 "Document %s has been moved to another synchronzation root, not adding entry to the change summary.", 170 docRef)); 171 } 172 continue; 173 } 174 isChangeSet = true; 175 } 176 } 177 if (!isChangeSet) { 178 // If the document has been deleted, is a regular unregistered synchronization root, has been moved 179 // to a non synchronization root, if its security has been updated denying access to the current 180 // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the 181 // FileSystemItem id and name to the FileSystemItemChange entry. 182 if (log.isDebugEnabled()) { 183 log.debug(String.format( 184 "Document %s doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.", 185 docRef)); 186 } 187 String fsId = fsIdInfo.getValue(String.class); 188 String fsName = entry.getExtendedInfos().get("fileSystemItemName").getValue(String.class); 189 String eventId; 190 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 191 // Move to a non synchronization root 192 eventId = NuxeoDriveEvents.DELETED_EVENT; 193 } else { 194 // Deletion, unregistration or security update 195 eventId = entry.getEventId(); 196 } 197 change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(), 198 entry.getRepositoryId(), entry.getDocUUID(), fsId, fsName); 199 } 200 if (log.isDebugEnabled()) { 201 log.debug(String.format("Adding FileSystemItemChange entry for document %s to the change summary.", 202 docRef)); 203 } 204 changes.add(change); 205 } else { 206 // No extended info in the audit log entry, this should not be a 207 // deleted document, nor an unregistered synchronization root 208 // nor a security update denying access to the current user. 209 if (log.isDebugEnabled()) { 210 log.debug(String.format( 211 "No extended info found in audit log entry, document %s has not been deleted nor is an unregistered synchronization root.", 212 docRef)); 213 } 214 if (!session.exists(docRef)) { 215 if (log.isDebugEnabled()) { 216 log.debug(String.format("Document %s doesn't exist, not adding entry to the change summary.", 217 docRef)); 218 } 219 // Deleted or non accessible documents are mapped to 220 // deleted file system items in a separate event: no need to 221 // try to propagate this event. 222 continue; 223 } 224 // Let's try to adapt the document as a FileSystemItem to 225 // provide it to the FileSystemItemChange entry. 226 change = getFileSystemItemChange(session, docRef, entry, null); 227 if (change == null) { 228 // Non-adaptable documents are ignored 229 if (log.isDebugEnabled()) { 230 log.debug(String.format( 231 "Document %s is not adaptable as a FileSystemItem, not adding any entry to the change summary.", 232 docRef)); 233 } 234 } else { 235 if (log.isDebugEnabled()) { 236 log.debug(String.format( 237 "Adding FileSystemItemChange entry for document %s to the change summary.", docRef)); 238 } 239 changes.add(change); 240 } 241 } 242 } 243 return changes; 244 } 245 246 /** 247 * To be deprecated (in fact make throw {@link UnsupportedOperationException}), keeping for backward compatibility. 248 * <p> 249 * Return the current time to query the logDate field of the audit log. This time intentionally truncated to 0 250 * milliseconds to have a consistent behavior across databases. 251 * <p> 252 * Should now use last available log id in the audit log table as upper bound. 253 * 254 * @see https://jira.nuxeo.com/browse/NXP-14826 255 * @see #getUpperBound() 256 */ 257 @Override 258 public long getCurrentDate() { 259 long now = System.currentTimeMillis(); 260 return now - (now % 1000); 261 } 262 263 /** 264 * Return the last available log id in the audit log table (primary key) to be used as the upper bound of the event 265 * log id range clause in the change query. 266 */ 267 @Override 268 @SuppressWarnings("unchecked") 269 public long getUpperBound() { 270 AuditReader auditService = Framework.getService(AuditReader.class); 271 String auditQuery = "from LogEntry log order by log.id desc"; 272 if (log.isDebugEnabled()) { 273 if (log.isDebugEnabled()) { 274 log.debug("Querying audit log for greatest id: " + auditQuery); 275 } 276 } 277 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1); 278 if (entries.isEmpty()) { 279 if (log.isDebugEnabled()) { 280 log.debug("Found no audit log entries, returning -1"); 281 } 282 return -1; 283 } 284 return entries.get(0).getId(); 285 } 286 287 @SuppressWarnings("unchecked") 288 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 289 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 290 AuditReader auditService = Framework.getLocalService(AuditReader.class); 291 // Set fixed query parameters 292 Map<String, Object> params = new HashMap<String, Object>(); 293 params.put("repositoryId", session.getRepositoryName()); 294 295 // Build query and set dynamic parameters 296 StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where "); 297 auditQuerySb.append("log.repositoryId = :repositoryId"); 298 auditQuerySb.append(" and "); 299 auditQuerySb.append("("); 300 if (!activeRoots.getPaths().isEmpty()) { 301 // detect changes under the currently active roots for the 302 // current user 303 auditQuerySb.append("("); 304 auditQuerySb.append("log.category = 'eventDocumentCategory'"); 305 // TODO: don't hardcode event ids (contribute them?) 306 auditQuerySb.append(" and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished')"); 307 auditQuerySb.append(" or "); 308 auditQuerySb.append("log.category = 'eventLifeCycleCategory'"); 309 auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' "); 310 auditQuerySb.append(") and ("); 311 auditQuerySb.append("("); 312 auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params)); 313 auditQuerySb.append(")"); 314 if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) { 315 auditQuerySb.append(" or ("); 316 auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params)); 317 auditQuerySb.append(")"); 318 } 319 auditQuerySb.append(") or "); 320 } 321 // Detect any root (un-)registration changes for the roots previously 322 // seen by the current user. 323 // Exclude 'rootUnregistered' since root unregistration is covered by a 324 // "deleted" virtual event. 325 auditQuerySb.append("("); 326 auditQuerySb.append("log.category = '"); 327 auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY); 328 auditQuerySb.append("' and log.eventId != 'rootUnregistered'"); 329 auditQuerySb.append(")"); 330 auditQuerySb.append(") and ("); 331 auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, integerBounds, params)); 332 // we intentionally sort by eventDate even if the range filtering is 333 // done on the log id: eventDate is useful to reflect the ordering of 334 // events occurring inside the same transaction while the 335 // monotonic behavior of log id is useful for ensuring that consecutive 336 // range queries to the audit won't miss any events even when long 337 // running transactions are logged after a delay. 338 auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc"); 339 String auditQuery = auditQuerySb.toString(); 340 341 if (log.isDebugEnabled()) { 342 if (log.isDebugEnabled()) { 343 log.debug("Querying audit log for changes: " + auditQuery + " with params: " + params); 344 } 345 } 346 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit); 347 348 // Post filter the output to remove (un)registration that are unrelated 349 // to the current user. 350 List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>(); 351 String principalName = session.getPrincipal().getName(); 352 for (LogEntry entry : entries) { 353 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 354 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 355 // ignore event that only impact other users 356 continue; 357 } 358 if (log.isDebugEnabled()) { 359 if (log.isDebugEnabled()) { 360 log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s", 361 entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(), 362 entry.getDocPath())); 363 } 364 } 365 postFilteredEntries.add(entry); 366 } 367 return postFilteredEntries; 368 } 369 370 protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) { 371 StringBuilder rootPathClause = new StringBuilder(); 372 int rootPathCount = 0; 373 for (String rootPath : rootPaths) { 374 rootPathCount++; 375 String rootPathParam = "rootPath" + rootPathCount; 376 if (rootPathClause.length() > 0) { 377 rootPathClause.append(" or "); 378 } 379 rootPathClause.append(String.format("log.docPath like :%s", rootPathParam)); 380 params.put(rootPathParam, rootPath + '%'); 381 382 } 383 return rootPathClause.toString(); 384 } 385 386 protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds, 387 Map<String, Object> params) { 388 String paramName = "collectionMemberIds"; 389 params.put(paramName, collectionSyncRootMemberIds); 390 return String.format("log.docUUID in (:%s)", paramName); 391 } 392 393 /** 394 * Now using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826. 395 * <p> 396 * Keeping ability to use old clause based on log date for backward compatibility, to be deprecated. 397 */ 398 protected String getJPARangeClause(long lowerBound, long upperBound, boolean integerBounds, 399 Map<String, Object> params) { 400 if (integerBounds) { 401 params.put("lowerBound", lowerBound); 402 params.put("upperBound", upperBound); 403 return "log.id > :lowerBound and log.id <= :upperBound"; 404 } else { 405 params.put("lastSuccessfulSyncDate", new Date(lowerBound)); 406 params.put("syncDate", new Date(upperBound)); 407 return "log.logDate >= :lastSuccessfulSyncDate and log.logDate < :syncDate"; 408 } 409 } 410 411 protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry, 412 String expectedFileSystemItemId) { 413 DocumentModel doc = session.getDocument(docRef); 414 // TODO: check the facet, last root change and list of roots 415 // to have a special handling for the roots. 416 FileSystemItem fsItem = null; 417 try { 418 fsItem = Framework.getLocalService(FileSystemItemAdapterService.class).getFileSystemItem(doc); 419 } catch (RootlessItemException e) { 420 // Can happen for an unregistered synchronization root that cannot 421 // be adapted as a FileSystemItem: nothing to do. 422 if (log.isDebugEnabled()) { 423 log.debug(String.format( 424 "RootlessItemException thrown while trying to adapt document %s as a FileSystemItem.", docRef)); 425 } 426 } 427 if (fsItem == null) { 428 if (log.isDebugEnabled()) { 429 log.debug(String.format("Document %s is not adaptable as a FileSystemItem, returning null.", docRef)); 430 } 431 return null; 432 } 433 if (expectedFileSystemItemId != null 434 && !fsItem.getId().endsWith( 435 AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) { 436 if (log.isDebugEnabled()) { 437 log.debug(String.format( 438 "Id %s of FileSystemItem adapted from document %s doesn't match expected FileSystemItem id %s, returning null.", 439 fsItem.getId(), docRef, expectedFileSystemItemId)); 440 } 441 return null; 442 } 443 if (log.isDebugEnabled()) { 444 log.debug(String.format( 445 "Document %s is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.", 446 docRef)); 447 } 448 // EventDate is able to reflect the ordering of the events 449 // inside a transaction (e.g. when several documents are 450 // created, updated, deleted at once) hence it's useful 451 // to pass that info to the client even though the change 452 // detection filtering is using the log id to have a 453 // guaranteed monotonic behavior that evenDate cannot 454 // guarantee when facing long transactions. 455 return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(), 456 entry.getRepositoryId(), entry.getDocUUID(), fsItem); 457 } 458 459}