001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mariana Cedica <mcedica@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 * Kevin Leturc <kleturc@nuxeo.com> 020 */ 021package org.nuxeo.drive.elasticsearch; 022 023import java.io.IOException; 024import java.time.ZonedDateTime; 025import java.time.temporal.ChronoUnit; 026import java.util.ArrayList; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.elasticsearch.action.search.SearchRequest; 034import org.elasticsearch.action.search.SearchResponse; 035import org.elasticsearch.action.search.SearchType; 036import org.elasticsearch.index.query.BoolQueryBuilder; 037import org.elasticsearch.index.query.QueryBuilder; 038import org.elasticsearch.index.query.QueryBuilders; 039import org.elasticsearch.index.query.RangeQueryBuilder; 040import org.elasticsearch.index.query.TermsQueryBuilder; 041import org.elasticsearch.search.SearchHit; 042import org.elasticsearch.search.SearchHits; 043import org.elasticsearch.search.builder.SearchSourceBuilder; 044import org.elasticsearch.search.sort.SortOrder; 045import org.nuxeo.drive.service.SynchronizationRoots; 046import org.nuxeo.drive.service.impl.AuditChangeFinder; 047import org.nuxeo.ecm.core.api.CoreSession; 048import org.nuxeo.ecm.core.api.repository.RepositoryManager; 049import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 050import org.nuxeo.ecm.platform.audit.api.LogEntry; 051import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 052import org.nuxeo.elasticsearch.ElasticSearchConstants; 053import org.nuxeo.elasticsearch.api.ESClient; 054import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 055import org.nuxeo.runtime.api.Framework; 056 057import com.fasterxml.jackson.databind.ObjectMapper; 058 059/** 060 * Override the JPA audit based change finder to execute query in ES. 061 * <p> 062 * The structure of the query executed by the {@link AuditChangeFinder} is: 063 * 064 * <pre> 065 * from LogEntry log where log.repositoryId = :repositoryId 066 * 067 * + AND if ActiveRoots (activeRoots) NOT empty 068 * 069 * from LogEntry log where log.repositoryId = :repositoryId and ( 070 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 071 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 072 * 073 * 074 * if ActiveRoots EMPTY: 075 * 076 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 077 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 078 * 079 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 080 * log.repositoryId asc, log.eventDate desc 081 * </pre> 082 * 083 * @since 7.3 084 */ 085public class ESAuditChangeFinder extends AuditChangeFinder { 086 087 private static final long serialVersionUID = 1L; 088 089 public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class); 090 091 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 092 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 093 int limit) { 094 095 SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE) 096 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 097 098 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 099 QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 100 upperBound, integerBounds, limit); 101 SearchSourceBuilder source = new SearchSourceBuilder().query( 102 QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 103 source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC); 104 source.size(limit); 105 request.source(source); 106 List<LogEntry> entries = new ArrayList<>(); 107 logSearchRequest(request); 108 SearchResponse searchResponse = getClient().search(request); 109 logSearchResponse(searchResponse); 110 ObjectMapper mapper = new ObjectMapper(); 111 for (SearchHit hit : searchResponse.getHits()) { 112 try { 113 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 114 } catch (IOException e) { 115 log.error("Error while reading Audit Entry from ES", e); 116 } 117 } 118 return entries; 119 } 120 121 protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 122 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 123 int limit) { 124 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 125 126 // from LogEntry log where log.repositoryId = :repositoryId 127 QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName()); 128 filterBuilder.must(repositoryClauseFilter); 129 130 if (activeRoots.getPaths().isEmpty()) { 131 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 132 filterBuilder.must(getDriveLogsQueryClause()); 133 } else { 134 135 BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery(); 136 137 // LIST_DOC_EVENTS_IDS_QUERY 138 139 // (log.category = 'eventDocumentCategory' and (log.eventId = 140 // 'documentCreated' or log.eventId = 'documentModified' or 141 // log.eventId = 'documentMoved' or log.eventId = 142 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 143 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId = 144 // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category = 145 // 'eventLifeCycleCategory' and log.eventId = 146 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 147 String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 148 "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked", 149 "documentUnlocked" }; 150 BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery(); 151 orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true)); 152 orEventsFilter.should( 153 getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true)); 154 orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 155 156 // ROOT_PATHS log.docPath like :rootPath1 157 if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) { 158 BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery(); 159 rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths())); 160 rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 161 162 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 163 // COLECTIONS_PATHS) 164 // or (log.category = 'NuxeoDrive' and log.eventId != 165 // 'rootUnregistered') ) 166 orFilterBuilderIfActiveRoots.should( 167 QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter)); 168 } else { 169 orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must( 170 getCurrentRootsClause(activeRoots.getPaths()))); 171 } 172 173 orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause()); 174 175 filterBuilder.must(orFilterBuilderIfActiveRoots); 176 } 177 178 filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound)); 179 return filterBuilder; 180 181 } 182 183 protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 184 RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id"); 185 rangeFilter.gt(lowerBound); 186 rangeFilter.lte(upperBound); 187 return rangeFilter; 188 } 189 190 protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 191 return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds); 192 } 193 194 protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) { 195 BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery(); 196 for (String rootPath : rootPaths) { 197 orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath)); 198 } 199 return orFilterRoots; 200 } 201 202 protected BoolQueryBuilder getDriveLogsQueryClause() { 203 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 204 filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive")); 205 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered")); 206 return filterBuilder; 207 } 208 209 protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 210 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 211 filterBuilder.must(QueryBuilders.termQuery("category", category)); 212 if (eventIds != null && eventIds.length > 0) { 213 if (eventIds.length == 1) { 214 if (shouldMatch) { 215 filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0])); 216 } else { 217 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0])); 218 } 219 } else { 220 if (shouldMatch) { 221 filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds)); 222 } else { 223 filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds)); 224 } 225 } 226 } 227 return filterBuilder; 228 } 229 230 @Override 231 public long getUpperBound() { 232 RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class); 233 return getUpperBound(new HashSet<>(repositoryManager.getRepositoryNames())); 234 } 235 236 /** 237 * Returns the last available log id in the audit index considering events older than the last clustering 238 * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the 239 * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh. 240 */ 241 @Override 242 public long getUpperBound(Set<String> repositoryNames) { 243 SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE) 244 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 245 RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate"); 246 long clusteringDelay = getClusteringDelay(repositoryNames); 247 if (clusteringDelay > -1) { 248 long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay; 249 filterBuilder = filterBuilder.lt(lastClusteringInvalidationDate); 250 } 251 SearchSourceBuilder source = new SearchSourceBuilder(); 252 source.sort("id", SortOrder.DESC).size(1); 253 // scroll on previous days with a times 2 step up to 32 254 ESClient esClient = getClient(); 255 for (int i = 1; i <= 32; i = i * 2) { 256 ZonedDateTime lowerLogDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.DAYS).minusDays(i); 257 // set lower bound in query 258 filterBuilder = filterBuilder.gt(lowerLogDateTime.toInstant().toEpochMilli()); 259 source.query(QueryBuilders.boolQuery().filter(filterBuilder)); 260 request.source(source); 261 // run request 262 logSearchRequest(request); 263 SearchResponse searchResponse = esClient.search(request); 264 logSearchResponse(searchResponse); 265 266 // if results return the first hit id 267 ObjectMapper mapper = new ObjectMapper(); 268 SearchHits hits = searchResponse.getHits(); 269 for (SearchHit hit : hits) { 270 try { 271 return mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class).getId(); 272 } catch (IOException e) { 273 log.error("Error while reading Audit Entry from ES", e); 274 } 275 } 276 } 277 if (clusteringDelay > -1) { 278 // Check for existing entries without the clustering invalidation date filter to not return -1 in this 279 // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 0 280 source.query(QueryBuilders.matchAllQuery()).size(0); 281 request.source(source); 282 logSearchRequest(request); 283 SearchResponse searchResponse = esClient.search(request); 284 logSearchResponse(searchResponse); 285 if (searchResponse.getHits().getTotalHits() > 0) { 286 log.debug("Found no audit log entries matching the criterias but some exist, returning 0"); 287 return 0; 288 } 289 } 290 log.debug("Found no audit log entries, returning -1"); 291 return -1; 292 } 293 294 @Override 295 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 296 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 297 int limit) { 298 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 299 upperBound, integerBounds, limit); 300 // Post filter the output to remove (un)registration that are unrelated 301 // to the current user. 302 // TODO move this to the ES query 303 List<LogEntry> postFilteredEntries = new ArrayList<>(); 304 String principalName = session.getPrincipal().getName(); 305 for (LogEntry entry : entries) { 306 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 307 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 308 // ignore event that only impact other users 309 continue; 310 } 311 if (log.isDebugEnabled()) { 312 if (log.isDebugEnabled()) { 313 log.debug(String.format("Change detected: %s", entry)); 314 } 315 } 316 postFilteredEntries.add(entry); 317 } 318 return postFilteredEntries; 319 } 320 321 protected ESClient getClient() { 322 return Framework.getService(ElasticSearchAdmin.class).getClient(); 323 } 324 325 protected String getESIndexName() { 326 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 327 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 328 } 329 330 protected void logSearchRequest(SearchRequest request) { 331 if (log.isDebugEnabled()) { 332 log.debug(String.format( 333 "Elasticsearch search request: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 334 getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString())); 335 } 336 } 337 338 protected void logSearchResponse(SearchResponse response) { 339 if (log.isDebugEnabled()) { 340 log.debug("Elasticsearch search response: " + response.toString()); 341 } 342 } 343}