001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mariana Cedica <mcedica@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 * Kevin Leturc <kleturc@nuxeo.com> 020 */ 021package org.nuxeo.drive.elasticsearch; 022 023import java.io.IOException; 024import java.time.ZonedDateTime; 025import java.time.temporal.ChronoUnit; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Set; 029 030import org.apache.logging.log4j.LogManager; 031import org.apache.logging.log4j.Logger; 032import org.elasticsearch.action.search.SearchRequest; 033import org.elasticsearch.action.search.SearchResponse; 034import org.elasticsearch.action.search.SearchType; 035import org.elasticsearch.index.query.BoolQueryBuilder; 036import org.elasticsearch.index.query.QueryBuilder; 037import org.elasticsearch.index.query.QueryBuilders; 038import org.elasticsearch.index.query.RangeQueryBuilder; 039import org.elasticsearch.index.query.TermsQueryBuilder; 040import org.elasticsearch.search.SearchHit; 041import org.elasticsearch.search.SearchHits; 042import org.elasticsearch.search.builder.SearchSourceBuilder; 043import org.elasticsearch.search.sort.SortOrder; 044import org.nuxeo.drive.service.SynchronizationRoots; 045import org.nuxeo.drive.service.impl.AuditChangeFinder; 046import org.nuxeo.ecm.core.api.CoreSession; 047import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 048import org.nuxeo.ecm.platform.audit.api.LogEntry; 049import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 050import org.nuxeo.elasticsearch.ElasticSearchConstants; 051import org.nuxeo.elasticsearch.api.ESClient; 052import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 053import org.nuxeo.runtime.api.Framework; 054 055import com.fasterxml.jackson.databind.ObjectMapper; 056 057/** 058 * Override the JPA audit based change finder to execute query in ES. 059 * <p> 060 * The structure of the query executed by the {@link AuditChangeFinder} is: 061 * 062 * <pre> 063 * {@code 064 * from LogEntry log where log.repositoryId = :repositoryId 065 * 066 * + AND if ActiveRoots (activeRoots) NOT empty 067 * 068 * from LogEntry log where log.repositoryId = :repositoryId and ( 069 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 070 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 071 * 072 * 073 * if ActiveRoots EMPTY: 074 * 075 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 076 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 077 * 078 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 079 * log.repositoryId asc, log.eventDate desc 080 * } 081 * </pre> 082 * 083 * @since 7.3 084 */ 085public class ESAuditChangeFinder extends AuditChangeFinder { 086 087 private static final Logger log = LogManager.getLogger(ESAuditChangeFinder.class); 088 089 protected static final String EVENT_ID = "eventId"; 090 091 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 092 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) { 093 094 SearchRequest request = new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH); 095 096 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 097 QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 098 upperBound); 099 SearchSourceBuilder source = new SearchSourceBuilder().query( 100 QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 101 source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC); 102 source.size(limit); 103 request.source(source); 104 List<LogEntry> entries = new ArrayList<>(); 105 logSearchRequest(request); 106 SearchResponse searchResponse = getClient().search(request); 107 logSearchResponse(searchResponse); 108 ObjectMapper mapper = new ObjectMapper(); 109 for (SearchHit hit : searchResponse.getHits()) { 110 try { 111 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 112 } catch (IOException e) { 113 log.error("Error while reading Audit Entry from ES", e); 114 } 115 } 116 return entries; 117 } 118 119 protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 120 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound) { 121 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 122 123 // from LogEntry log where log.repositoryId = :repositoryId 124 QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName()); 125 filterBuilder.must(repositoryClauseFilter); 126 127 if (activeRoots.getPaths().isEmpty()) { 128 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 129 filterBuilder.must(getDriveLogsQueryClause()); 130 } else { 131 132 BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery(); 133 134 // LIST_DOC_EVENTS_IDS_QUERY 135 136 // (log.category = 'eventDocumentCategory' and (log.eventId = 137 // 'documentCreated' or log.eventId = 'documentModified' or 138 // log.eventId = 'documentMoved' or log.eventId = 139 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 140 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId = 141 // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category = 142 // 'eventLifeCycleCategory' and log.eventId = 143 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 144 String[] eventIds = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 145 "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked", 146 "documentUnlocked", "documentUntrashed" }; 147 BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery(); 148 orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true)); 149 orEventsFilter.should( 150 getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true)); 151 orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 152 153 // ROOT_PATHS log.docPath like :rootPath1 154 if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) { 155 BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery(); 156 rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths())); 157 rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 158 159 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 160 // COLECTIONS_PATHS) 161 // or (log.category = 'NuxeoDrive' and log.eventId != 162 // 'rootUnregistered') ) 163 orFilterBuilderIfActiveRoots.should( 164 QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter)); 165 } else { 166 orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must( 167 getCurrentRootsClause(activeRoots.getPaths()))); 168 } 169 170 orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause()); 171 172 filterBuilder.must(orFilterBuilderIfActiveRoots); 173 } 174 175 filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound)); 176 return filterBuilder; 177 178 } 179 180 protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 181 RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id"); 182 rangeFilter.gt(lowerBound); 183 rangeFilter.lte(upperBound); 184 return rangeFilter; 185 } 186 187 protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 188 return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds); 189 } 190 191 protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) { 192 BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery(); 193 for (String rootPath : rootPaths) { 194 orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath)); 195 } 196 return orFilterRoots; 197 } 198 199 protected BoolQueryBuilder getDriveLogsQueryClause() { 200 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 201 filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive")); 202 filterBuilder.mustNot(QueryBuilders.termQuery(EVENT_ID, "rootUnregistered")); 203 return filterBuilder; 204 } 205 206 protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 207 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 208 filterBuilder.must(QueryBuilders.termQuery("category", category)); 209 if (eventIds != null && eventIds.length > 0) { 210 if (eventIds.length == 1) { 211 if (shouldMatch) { 212 filterBuilder.must(QueryBuilders.termQuery(EVENT_ID, eventIds[0])); 213 } else { 214 filterBuilder.mustNot(QueryBuilders.termQuery(EVENT_ID, eventIds[0])); 215 } 216 } else { 217 if (shouldMatch) { 218 filterBuilder.must(QueryBuilders.termsQuery(EVENT_ID, eventIds)); 219 } else { 220 filterBuilder.mustNot(QueryBuilders.termsQuery(EVENT_ID, eventIds)); 221 } 222 } 223 } 224 return filterBuilder; 225 } 226 227 @Override 228 public long getUpperBound() { 229 SearchRequest request = new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH); 230 RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate"); 231 SearchSourceBuilder source = new SearchSourceBuilder(); 232 source.sort("id", SortOrder.DESC).size(1); 233 // scroll on previous days with a times 2 step up to 32 234 ESClient esClient = getClient(); 235 for (int i = 1; i <= 32; i = i * 2) { 236 ZonedDateTime lowerLogDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.DAYS).minusDays(i); 237 // set lower bound in query 238 filterBuilder = filterBuilder.gt(lowerLogDateTime.toInstant().toEpochMilli()); 239 source.query(QueryBuilders.boolQuery().filter(filterBuilder)); 240 request.source(source); 241 // run request 242 logSearchRequest(request); 243 SearchResponse searchResponse = esClient.search(request); 244 logSearchResponse(searchResponse); 245 246 // if results return the first hit id 247 ObjectMapper mapper = new ObjectMapper(); 248 SearchHits hits = searchResponse.getHits(); 249 for (SearchHit hit : hits) { 250 try { 251 return mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class).getId(); 252 } catch (IOException e) { 253 log.error("Error while reading Audit Entry from ES", e); 254 } 255 } 256 } 257 log.debug("Found no audit log entries, returning -1"); 258 return -1; 259 } 260 261 @Override 262 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 263 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) { 264 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 265 upperBound, limit); 266 // Post filter the output to remove (un)registration that are unrelated 267 // to the current user. 268 // TODO move this to the ES query 269 List<LogEntry> postFilteredEntries = new ArrayList<>(); 270 String principalName = session.getPrincipal().getName(); 271 for (LogEntry entry : entries) { 272 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 273 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 274 // ignore event that only impact other users 275 continue; 276 } 277 log.debug("Change detected: {}", entry); 278 postFilteredEntries.add(entry); 279 } 280 return postFilteredEntries; 281 } 282 283 protected ESClient getClient() { 284 return Framework.getService(ElasticSearchAdmin.class).getClient(); 285 } 286 287 protected String getESIndexName() { 288 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 289 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 290 } 291 292 protected void logSearchRequest(SearchRequest request) { 293 log.debug("Elasticsearch search request: curl -XGET 'http://localhost:9200/{}/_search?pretty' -d '{}'", 294 this::getESIndexName, () -> request); 295 } 296 297 protected void logSearchResponse(SearchResponse response) { 298 log.debug("Elasticsearch search response: {}", response); 299 } 300}