001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mariana Cedica <mcedica@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020package org.nuxeo.drive.elasticsearch; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.action.search.SearchRequestBuilder; 031import org.elasticsearch.action.search.SearchResponse; 032import org.elasticsearch.action.search.SearchType; 033import org.elasticsearch.client.Client; 034import org.elasticsearch.index.query.AndFilterBuilder; 035import org.elasticsearch.index.query.BoolFilterBuilder; 036import org.elasticsearch.index.query.FilterBuilder; 037import org.elasticsearch.index.query.FilterBuilders; 038import org.elasticsearch.index.query.OrFilterBuilder; 039import org.elasticsearch.index.query.QueryBuilder; 040import org.elasticsearch.index.query.QueryBuilders; 041import org.elasticsearch.index.query.RangeFilterBuilder; 042import org.elasticsearch.index.query.TermsFilterBuilder; 043import org.elasticsearch.search.SearchHit; 044import org.elasticsearch.search.SearchHits; 045import org.elasticsearch.search.sort.SortOrder; 046import org.nuxeo.drive.service.SynchronizationRoots; 047import org.nuxeo.drive.service.impl.AuditChangeFinder; 048import org.nuxeo.ecm.core.api.CoreSession; 049import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 050import org.nuxeo.ecm.platform.audit.api.LogEntry; 051import org.nuxeo.elasticsearch.ElasticSearchConstants; 052import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 053import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 054import org.nuxeo.runtime.api.Framework; 055 056/** 057 * Override the JPA audit based change finder to execute query in ES. 058 * <p> 059 * The structure of the query executed by the {@link AuditChangeFinder} is: 060 * 061 * <pre> 062 * from LogEntry log where log.repositoryId = :repositoryId 063 * 064 * + AND if ActiveRoots (activeRoots) NOT empty 065 * 066 * from LogEntry log where log.repositoryId = :repositoryId and ( 067 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 068 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 069 * 070 * 071 * if ActiveRoots EMPTY: 072 * 073 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 074 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 075 * 076 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 077 * log.repositoryId asc, log.eventDate desc 078 * </pre> 079 * 080 * @since 7.3 081 */ 082public class ESAuditChangeFinder extends AuditChangeFinder { 083 084 private static final long serialVersionUID = 1L; 085 086 public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class); 087 088 protected Client esClient = null; 089 090 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 091 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 092 093 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 094 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 095 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 096 097 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 098 FilterBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 099 upperBound, integerBounds, limit); 100 builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder)); 101 102 builder.addSort("repositoryId", SortOrder.ASC); 103 builder.addSort("eventDate", SortOrder.DESC); 104 105 List<LogEntry> entries = new ArrayList<>(); 106 SearchResponse searchResponse = builder.setSize(limit).execute().actionGet(); 107 for (SearchHit hit : searchResponse.getHits()) { 108 try { 109 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 110 } catch (IOException e) { 111 log.error("Error while reading Audit Entry from ES", e); 112 } 113 } 114 return entries; 115 } 116 117 protected FilterBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 118 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 119 AndFilterBuilder filterBuilder = FilterBuilders.andFilter(); 120 121 // from LogEntry log where log.repositoryId = :repositoryId 122 FilterBuilder repositoryClauseFilter = FilterBuilders.termFilter("repositoryId", session.getRepositoryName()); 123 filterBuilder.add(repositoryClauseFilter); 124 125 if (activeRoots.getPaths().isEmpty()) { 126 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 127 filterBuilder.add(getDriveLogsQueryClause()); 128 } else { 129 130 OrFilterBuilder orFilterBuilderIfActiveRoots = FilterBuilders.orFilter(); 131 132 // LIST_DOC_EVENTS_IDS_QUERY 133 134 // (log.category = 'eventDocumentCategory' and (log.eventId = 135 // 'documentCreated' or log.eventId = 'documentModified' or 136 // log.eventId = 'documentMoved' or log.eventId = 137 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 138 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId = 139 // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category = 140 // 'eventLifeCycleCategory' and log.eventId = 141 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 142 String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 143 "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked", 144 "documentUnlocked" }; 145 OrFilterBuilder orEventsFilter = FilterBuilders.orFilter(); 146 orEventsFilter.add(getEventsClause("eventDocumentCategory", eventIds, true)); 147 orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, 148 true)); 149 orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 150 151 // ROOT_PATHS log.docPath like :rootPath1 152 if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) { 153 OrFilterBuilder rootsOrCollectionsFilter = FilterBuilders.orFilter(); 154 rootsOrCollectionsFilter.add(getCurrentRootsClause(activeRoots.getPaths())); 155 rootsOrCollectionsFilter.add(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 156 157 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 158 // COLECTIONS_PATHS) 159 // or (log.category = 'NuxeoDrive' and log.eventId != 160 // 'rootUnregistered') ) 161 orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter, rootsOrCollectionsFilter)); 162 } else { 163 orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter, 164 getCurrentRootsClause(activeRoots.getPaths()))); 165 } 166 167 orFilterBuilderIfActiveRoots.add(getDriveLogsQueryClause()); 168 169 filterBuilder.add(orFilterBuilderIfActiveRoots); 170 } 171 172 filterBuilder.add(getLogIdBoundsClause(lowerBound, upperBound)); 173 return filterBuilder; 174 175 } 176 177 protected RangeFilterBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 178 RangeFilterBuilder rangeFilter = FilterBuilders.rangeFilter("id"); 179 rangeFilter.gt(lowerBound); 180 rangeFilter.lte(upperBound); 181 return rangeFilter; 182 } 183 184 protected TermsFilterBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 185 return FilterBuilders.termsFilter("docUUID", collectionSyncRootMemberIds); 186 } 187 188 protected OrFilterBuilder getCurrentRootsClause(Set<String> rootPaths) { 189 OrFilterBuilder orFilterRoots = FilterBuilders.orFilter(); 190 for (String rootPath : rootPaths) { 191 orFilterRoots.add(FilterBuilders.prefixFilter("docPath", rootPath)); 192 } 193 return orFilterRoots; 194 } 195 196 protected BoolFilterBuilder getDriveLogsQueryClause() { 197 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 198 filterBuilder.must(FilterBuilders.termFilter("category", "NuxeoDrive")); 199 filterBuilder.mustNot(FilterBuilders.termFilter("eventId", "rootUnregistered")); 200 return filterBuilder; 201 } 202 203 protected BoolFilterBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 204 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 205 filterBuilder.must(FilterBuilders.termFilter("category", category)); 206 if (eventIds != null && eventIds.length > 0) { 207 if (eventIds.length == 1) { 208 if (shouldMatch) { 209 filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0])); 210 } else { 211 filterBuilder.mustNot(FilterBuilders.termFilter("eventId", eventIds[0])); 212 } 213 } else { 214 if (shouldMatch) { 215 filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds)); 216 } else { 217 filterBuilder.mustNot(FilterBuilders.termsFilter("eventId", eventIds)); 218 } 219 } 220 } 221 return filterBuilder; 222 } 223 224 @Override 225 public long getUpperBound() { 226 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 227 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 228 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 229 // TODO refactor this to use max clause 230 builder.setQuery(QueryBuilders.matchAllQuery()); 231 builder.addSort("id", SortOrder.DESC); 232 builder.setSize(1); 233 SearchResponse searchResponse = builder.execute().actionGet(); 234 List<LogEntry> entries = new ArrayList<>(); 235 SearchHits hits = searchResponse.getHits(); 236 for (SearchHit hit : hits) { 237 try { 238 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 239 } catch (IOException e) { 240 log.error("Error while reading Audit Entry from ES", e); 241 } 242 } 243 return entries.size() > 0 ? entries.get(0).getId() : -1; 244 } 245 246 /** 247 * Returns the last available log id in the audit index considering events older than the last clustering 248 * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the 249 * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh. 250 */ 251 @Override 252 public long getUpperBound(Set<String> repositoryNames) { 253 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 254 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 255 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 256 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 257 long clusteringDelay = getClusteringDelay(repositoryNames); 258 if (clusteringDelay > -1) { 259 long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay; 260 FilterBuilder filterBuilder = FilterBuilders.rangeFilter("logDate").lt( 261 new Date(lastClusteringInvalidationDate)); 262 builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder)); 263 } else { 264 builder.setQuery(queryBuilder); 265 } 266 builder.addSort("id", SortOrder.DESC); 267 builder.setSize(1); 268 SearchResponse searchResponse = builder.execute().actionGet(); 269 List<LogEntry> entries = new ArrayList<>(); 270 SearchHits hits = searchResponse.getHits(); 271 for (SearchHit hit : hits) { 272 try { 273 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 274 } catch (IOException e) { 275 log.error("Error while reading Audit Entry from ES", e); 276 } 277 } 278 if (entries.isEmpty()) { 279 if (clusteringDelay > -1) { 280 // Check for existing entries without the clustering invalidation date filter to not return -1 in this 281 // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 282 // 0 283 builder.setQuery(queryBuilder); 284 searchResponse = builder.execute().actionGet(); 285 if (searchResponse.getHits().iterator().hasNext()) { 286 log.debug("Found no audit log entries matching the criterias but some exist, returning 0"); 287 return 0; 288 } 289 } 290 log.debug("Found no audit log entries, returning -1"); 291 return -1; 292 } 293 return entries.get(0).getId(); 294 } 295 296 @Override 297 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 298 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 299 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 300 upperBound, integerBounds, limit); 301 // Post filter the output to remove (un)registration that are unrelated 302 // to the current user. 303 // TODO move this to the ES query 304 List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>(); 305 String principalName = session.getPrincipal().getName(); 306 for (LogEntry entry : entries) { 307 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 308 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 309 // ignore event that only impact other users 310 continue; 311 } 312 if (log.isDebugEnabled()) { 313 if (log.isDebugEnabled()) { 314 log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s", 315 entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(), 316 entry.getDocPath())); 317 } 318 } 319 postFilteredEntries.add(entry); 320 } 321 return postFilteredEntries; 322 } 323 324 protected Client getClient() { 325 if (esClient == null) { 326 log.info("Activate Elasticsearch backend for Audit"); 327 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 328 esClient = esa.getClient(); 329 } 330 return esClient; 331 } 332 333 protected String getESIndexName() { 334 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 335 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 336 } 337}