001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mariana Cedica <mcedica@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020package org.nuxeo.drive.elasticsearch; 021 022import java.io.IOException; 023import java.time.ZonedDateTime; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.action.search.SearchRequest; 031import org.elasticsearch.action.search.SearchResponse; 032import org.elasticsearch.action.search.SearchType; 033import org.elasticsearch.index.query.BoolQueryBuilder; 034import org.elasticsearch.index.query.QueryBuilder; 035import org.elasticsearch.index.query.QueryBuilders; 036import org.elasticsearch.index.query.RangeQueryBuilder; 037import org.elasticsearch.index.query.TermsQueryBuilder; 038import org.elasticsearch.search.SearchHit; 039import org.elasticsearch.search.SearchHits; 040import org.elasticsearch.search.builder.SearchSourceBuilder; 041import org.elasticsearch.search.sort.SortOrder; 042import org.nuxeo.drive.service.SynchronizationRoots; 043import org.nuxeo.drive.service.impl.AuditChangeFinder; 044import org.nuxeo.ecm.core.api.CoreSession; 045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 046import org.nuxeo.ecm.platform.audit.api.LogEntry; 047import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 048import org.nuxeo.elasticsearch.ElasticSearchConstants; 049import org.nuxeo.elasticsearch.api.ESClient; 050import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 051import org.nuxeo.runtime.api.Framework; 052 053import com.fasterxml.jackson.databind.ObjectMapper; 054 055/** 056 * Override the JPA audit based change finder to execute query in ES. 057 * <p> 058 * The structure of the query executed by the {@link AuditChangeFinder} is: 059 * 060 * <pre> 061 * from LogEntry log where log.repositoryId = :repositoryId 062 * 063 * + AND if ActiveRoots (activeRoots) NOT empty 064 * 065 * from LogEntry log where log.repositoryId = :repositoryId and ( 066 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 067 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 068 * 069 * 070 * if ActiveRoots EMPTY: 071 * 072 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 073 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 074 * 075 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 076 * log.repositoryId asc, log.eventDate desc 077 * </pre> 078 * 079 * @since 7.3 080 */ 081public class ESAuditChangeFinder extends AuditChangeFinder { 082 083 private static final long serialVersionUID = 1L; 084 085 public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class); 086 087 protected ESClient esClient = null; 088 089 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 090 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 091 int limit) { 092 093 SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE) 094 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 095 096 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 097 QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 098 upperBound, integerBounds, limit); 099 SearchSourceBuilder source = new SearchSourceBuilder().query( 100 QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 101 source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC); 102 source.size(limit); 103 request.source(source); 104 List<LogEntry> entries = new ArrayList<>(); 105 logSearchRequest(request); 106 SearchResponse searchResponse = getClient().search(request); 107 logSearchResponse(searchResponse); 108 ObjectMapper mapper = new ObjectMapper(); 109 for (SearchHit hit : searchResponse.getHits()) { 110 try { 111 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 112 } catch (IOException e) { 113 log.error("Error while reading Audit Entry from ES", e); 114 } 115 } 116 return entries; 117 } 118 119 protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 120 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 121 int limit) { 122 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 123 124 // from LogEntry log where log.repositoryId = :repositoryId 125 QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName()); 126 filterBuilder.must(repositoryClauseFilter); 127 128 if (activeRoots.getPaths().isEmpty()) { 129 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 130 filterBuilder.must(getDriveLogsQueryClause()); 131 } else { 132 133 BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery(); 134 135 // LIST_DOC_EVENTS_IDS_QUERY 136 137 // (log.category = 'eventDocumentCategory' and (log.eventId = 138 // 'documentCreated' or log.eventId = 'documentModified' or 139 // log.eventId = 'documentMoved' or log.eventId = 140 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 141 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId = 142 // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category = 143 // 'eventLifeCycleCategory' and log.eventId = 144 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 145 String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 146 "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked", 147 "documentUnlocked" }; 148 BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery(); 149 orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true)); 150 orEventsFilter.should( 151 getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true)); 152 orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 153 154 // ROOT_PATHS log.docPath like :rootPath1 155 if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) { 156 BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery(); 157 rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths())); 158 rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 159 160 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 161 // COLECTIONS_PATHS) 162 // or (log.category = 'NuxeoDrive' and log.eventId != 163 // 'rootUnregistered') ) 164 orFilterBuilderIfActiveRoots.should( 165 QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter)); 166 } else { 167 orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must( 168 getCurrentRootsClause(activeRoots.getPaths()))); 169 } 170 171 orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause()); 172 173 filterBuilder.must(orFilterBuilderIfActiveRoots); 174 } 175 176 filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound)); 177 return filterBuilder; 178 179 } 180 181 protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 182 RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id"); 183 rangeFilter.gt(lowerBound); 184 rangeFilter.lte(upperBound); 185 return rangeFilter; 186 } 187 188 protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 189 return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds); 190 } 191 192 protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) { 193 BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery(); 194 for (String rootPath : rootPaths) { 195 orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath)); 196 } 197 return orFilterRoots; 198 } 199 200 protected BoolQueryBuilder getDriveLogsQueryClause() { 201 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 202 filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive")); 203 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered")); 204 return filterBuilder; 205 } 206 207 protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 208 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 209 filterBuilder.must(QueryBuilders.termQuery("category", category)); 210 if (eventIds != null && eventIds.length > 0) { 211 if (eventIds.length == 1) { 212 if (shouldMatch) { 213 filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0])); 214 } else { 215 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0])); 216 } 217 } else { 218 if (shouldMatch) { 219 filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds)); 220 } else { 221 filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds)); 222 } 223 } 224 } 225 return filterBuilder; 226 } 227 228 @Override 229 public long getUpperBound() { 230 SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE) 231 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 232 // TODO refactor this to use max clause 233 request.source( 234 new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).sort("id", SortOrder.DESC).size(1)); 235 logSearchRequest(request); 236 SearchResponse searchResponse = getClient().search(request); 237 logSearchResponse(searchResponse); 238 List<LogEntry> entries = new ArrayList<>(); 239 SearchHits hits = searchResponse.getHits(); 240 ObjectMapper mapper = new ObjectMapper(); 241 for (SearchHit hit : hits) { 242 try { 243 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 244 } catch (IOException e) { 245 log.error("Error while reading Audit Entry from ES", e); 246 } 247 } 248 return entries.size() > 0 ? entries.get(0).getId() : -1; 249 } 250 251 /** 252 * Returns the last available log id in the audit index considering events older than the last clustering 253 * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the 254 * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh. 255 */ 256 @Override 257 public long getUpperBound(Set<String> repositoryNames) { 258 SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE) 259 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 260 RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate"); 261 long clusteringDelay = getClusteringDelay(repositoryNames); 262 if (clusteringDelay > -1) { 263 long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay; 264 filterBuilder = filterBuilder.lt(lastClusteringInvalidationDate); 265 } 266 SearchSourceBuilder source = new SearchSourceBuilder(); 267 source.sort("id", SortOrder.DESC).size(1); 268 // scroll on previous days with a times 2 step up to 32 269 for (int i = 1; i <= 32; i = i * 2) { 270 ZonedDateTime lowerLogDateTime = ZonedDateTime.now().minusDays(i); 271 // set lower bound in query 272 filterBuilder = filterBuilder.gt(lowerLogDateTime.toInstant().toEpochMilli()); 273 source.query(QueryBuilders.boolQuery().filter(filterBuilder)); 274 request.source(source); 275 // run request 276 logSearchRequest(request); 277 SearchResponse searchResponse = getClient().search(request); 278 logSearchResponse(searchResponse); 279 280 // if results return the first hit id 281 ObjectMapper mapper = new ObjectMapper(); 282 SearchHits hits = searchResponse.getHits(); 283 for (SearchHit hit : hits) { 284 try { 285 return mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class).getId(); 286 } catch (IOException e) { 287 log.error("Error while reading Audit Entry from ES", e); 288 } 289 } 290 } 291 if (clusteringDelay > -1) { 292 // Check for existing entries without the clustering invalidation date filter to not return -1 in this 293 // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 0 294 source.query(QueryBuilders.matchAllQuery()).size(0); 295 request.source(source); 296 logSearchRequest(request); 297 SearchResponse searchResponse = getClient().search(request); 298 logSearchResponse(searchResponse); 299 if (searchResponse.getHits().getTotalHits() > 0) { 300 log.debug("Found no audit log entries matching the criterias but some exist, returning 0"); 301 return 0; 302 } 303 } 304 log.debug("Found no audit log entries, returning -1"); 305 return -1; 306 } 307 308 @Override 309 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 310 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 311 int limit) { 312 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 313 upperBound, integerBounds, limit); 314 // Post filter the output to remove (un)registration that are unrelated 315 // to the current user. 316 // TODO move this to the ES query 317 List<LogEntry> postFilteredEntries = new ArrayList<>(); 318 String principalName = session.getPrincipal().getName(); 319 for (LogEntry entry : entries) { 320 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 321 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 322 // ignore event that only impact other users 323 continue; 324 } 325 if (log.isDebugEnabled()) { 326 if (log.isDebugEnabled()) { 327 log.debug(String.format("Change detected: %s", entry)); 328 } 329 } 330 postFilteredEntries.add(entry); 331 } 332 return postFilteredEntries; 333 } 334 335 protected ESClient getClient() { 336 if (esClient == null) { 337 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 338 esClient = esa.getClient(); 339 } 340 return esClient; 341 } 342 343 protected String getESIndexName() { 344 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 345 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 346 } 347 348 protected void logSearchRequest(SearchRequest request) { 349 if (log.isDebugEnabled()) { 350 log.debug(String.format( 351 "Elasticsearch search request: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 352 getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString())); 353 } 354 } 355 356 protected void logSearchResponse(SearchResponse response) { 357 if (log.isDebugEnabled()) { 358 log.debug("Elasticsearch search response: " + response.toString()); 359 } 360 } 361}