001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mariana Cedica <mcedica@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020package org.nuxeo.drive.elasticsearch; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Set; 026 027import com.fasterxml.jackson.databind.ObjectMapper; 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.action.search.SearchRequest; 031import org.elasticsearch.action.search.SearchResponse; 032import org.elasticsearch.action.search.SearchType; 033import org.elasticsearch.index.query.BoolQueryBuilder; 034import org.elasticsearch.index.query.QueryBuilder; 035import org.elasticsearch.index.query.QueryBuilders; 036import org.elasticsearch.index.query.RangeQueryBuilder; 037import org.elasticsearch.index.query.TermsQueryBuilder; 038import org.elasticsearch.search.SearchHit; 039import org.elasticsearch.search.SearchHits; 040import org.elasticsearch.search.builder.SearchSourceBuilder; 041import org.elasticsearch.search.sort.SortOrder; 042import org.nuxeo.drive.service.SynchronizationRoots; 043import org.nuxeo.drive.service.impl.AuditChangeFinder; 044import org.nuxeo.ecm.core.api.CoreSession; 045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 046import org.nuxeo.ecm.platform.audit.api.LogEntry; 047import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 048import org.nuxeo.elasticsearch.ElasticSearchConstants; 049import org.nuxeo.elasticsearch.api.ESClient; 050import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 051import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 052import org.nuxeo.runtime.api.Framework; 053 054/** 055 * Override the JPA audit based change finder to execute query in ES. 056 * <p> 057 * The structure of the query executed by the {@link AuditChangeFinder} is: 058 * 059 * <pre> 060 * from LogEntry log where log.repositoryId = :repositoryId 061 * 062 * + AND if ActiveRoots (activeRoots) NOT empty 063 * 064 * from LogEntry log where log.repositoryId = :repositoryId and ( 065 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 066 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 067 * 068 * 069 * if ActiveRoots EMPTY: 070 * 071 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 072 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 073 * 074 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 075 * log.repositoryId asc, log.eventDate desc 076 * </pre> 077 * 078 * @since 7.3 079 */ 080public class ESAuditChangeFinder extends AuditChangeFinder { 081 082 private static final long serialVersionUID = 1L; 083 084 public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class); 085 086 protected ESClient esClient = null; 087 088 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 089 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 090 int limit) { 091 092 SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE) 093 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 094 095 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 096 QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 097 upperBound, integerBounds, limit); 098 SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 099 source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC); 100 source.size(limit); 101 request.source(source); 102 List<LogEntry> entries = new ArrayList<>(); 103 logSearchRequest(request); 104 SearchResponse searchResponse = getClient().search(request); 105 logSearchResponse(searchResponse); 106 ObjectMapper mapper = new ObjectMapper(); 107 for (SearchHit hit : searchResponse.getHits()) { 108 try { 109 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 110 } catch (IOException e) { 111 log.error("Error while reading Audit Entry from ES", e); 112 } 113 } 114 return entries; 115 } 116 117 protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 118 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 119 int limit) { 120 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 121 122 // from LogEntry log where log.repositoryId = :repositoryId 123 QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName()); 124 filterBuilder.must(repositoryClauseFilter); 125 126 if (activeRoots.getPaths().isEmpty()) { 127 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 128 filterBuilder.must(getDriveLogsQueryClause()); 129 } else { 130 131 BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery(); 132 133 // LIST_DOC_EVENTS_IDS_QUERY 134 135 // (log.category = 'eventDocumentCategory' and (log.eventId = 136 // 'documentCreated' or log.eventId = 'documentModified' or 137 // log.eventId = 'documentMoved' or log.eventId = 138 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 139 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId = 140 // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category = 141 // 'eventLifeCycleCategory' and log.eventId = 142 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 143 String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 144 "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked", 145 "documentUnlocked" }; 146 BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery(); 147 orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true)); 148 orEventsFilter.should( 149 getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true)); 150 orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 151 152 // ROOT_PATHS log.docPath like :rootPath1 153 if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) { 154 BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery(); 155 rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths())); 156 rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 157 158 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 159 // COLECTIONS_PATHS) 160 // or (log.category = 'NuxeoDrive' and log.eventId != 161 // 'rootUnregistered') ) 162 orFilterBuilderIfActiveRoots.should( 163 QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter)); 164 } else { 165 orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must( 166 getCurrentRootsClause(activeRoots.getPaths()))); 167 } 168 169 orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause()); 170 171 filterBuilder.must(orFilterBuilderIfActiveRoots); 172 } 173 174 filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound)); 175 return filterBuilder; 176 177 } 178 179 protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 180 RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id"); 181 rangeFilter.gt(lowerBound); 182 rangeFilter.lte(upperBound); 183 return rangeFilter; 184 } 185 186 protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 187 return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds); 188 } 189 190 protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) { 191 BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery(); 192 for (String rootPath : rootPaths) { 193 orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath)); 194 } 195 return orFilterRoots; 196 } 197 198 protected BoolQueryBuilder getDriveLogsQueryClause() { 199 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 200 filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive")); 201 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered")); 202 return filterBuilder; 203 } 204 205 protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 206 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 207 filterBuilder.must(QueryBuilders.termQuery("category", category)); 208 if (eventIds != null && eventIds.length > 0) { 209 if (eventIds.length == 1) { 210 if (shouldMatch) { 211 filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0])); 212 } else { 213 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0])); 214 } 215 } else { 216 if (shouldMatch) { 217 filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds)); 218 } else { 219 filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds)); 220 } 221 } 222 } 223 return filterBuilder; 224 } 225 226 @Override 227 public long getUpperBound() { 228 SearchRequest request = new SearchRequest(getESIndexName()) 229 .types(ElasticSearchConstants.ENTRY_TYPE) 230 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 231 // TODO refactor this to use max clause 232 request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) 233 .sort("id", SortOrder.DESC).size(1)); 234 logSearchRequest(request); 235 SearchResponse searchResponse = getClient().search(request); 236 logSearchResponse(searchResponse); 237 List<LogEntry> entries = new ArrayList<>(); 238 SearchHits hits = searchResponse.getHits(); 239 ObjectMapper mapper = new ObjectMapper(); 240 for (SearchHit hit : hits) { 241 try { 242 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 243 } catch (IOException e) { 244 log.error("Error while reading Audit Entry from ES", e); 245 } 246 } 247 return entries.size() > 0 ? entries.get(0).getId() : -1; 248 } 249 250 /** 251 * Returns the last available log id in the audit index considering events older than the last clustering 252 * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the 253 * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh. 254 */ 255 @Override 256 public long getUpperBound(Set<String> repositoryNames) { 257 SearchRequest request = new SearchRequest(getESIndexName()) 258 .types(ElasticSearchConstants.ENTRY_TYPE) 259 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 260 SearchSourceBuilder source = new SearchSourceBuilder(); 261 long clusteringDelay = getClusteringDelay(repositoryNames); 262 if (clusteringDelay > -1) { 263 long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay; 264 RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate") 265 .lt(lastClusteringInvalidationDate); 266 source.query(QueryBuilders.boolQuery().filter(filterBuilder)); 267 } else { 268 source.query(QueryBuilders.matchAllQuery()); 269 } 270 source.sort("id", SortOrder.DESC).size(1); 271 request.source(source); 272 logSearchRequest(request); 273 SearchResponse searchResponse = getClient().search(request); 274 logSearchResponse(searchResponse); 275 List<LogEntry> entries = new ArrayList<>(); 276 SearchHits hits = searchResponse.getHits(); 277 ObjectMapper mapper = new ObjectMapper(); 278 for (SearchHit hit : hits) { 279 try { 280 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 281 } catch (IOException e) { 282 log.error("Error while reading Audit Entry from ES", e); 283 } 284 } 285 if (entries.isEmpty()) { 286 if (clusteringDelay > -1) { 287 // Check for existing entries without the clustering invalidation date filter to not return -1 in this 288 // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 289 // 0 290 source.query(QueryBuilders.matchAllQuery()); 291 request.source(source); 292 logSearchRequest(request); 293 searchResponse = getClient().search(request); 294 logSearchResponse(searchResponse); 295 if (searchResponse.getHits().iterator().hasNext()) { 296 log.debug("Found no audit log entries matching the criterias but some exist, returning 0"); 297 return 0; 298 } 299 } 300 log.debug("Found no audit log entries, returning -1"); 301 return -1; 302 } 303 return entries.get(0).getId(); 304 } 305 306 @Override 307 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 308 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 309 int limit) { 310 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 311 upperBound, integerBounds, limit); 312 // Post filter the output to remove (un)registration that are unrelated 313 // to the current user. 314 // TODO move this to the ES query 315 List<LogEntry> postFilteredEntries = new ArrayList<>(); 316 String principalName = session.getPrincipal().getName(); 317 for (LogEntry entry : entries) { 318 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 319 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 320 // ignore event that only impact other users 321 continue; 322 } 323 if (log.isDebugEnabled()) { 324 if (log.isDebugEnabled()) { 325 log.debug(String.format("Change detected: %s", entry)); 326 } 327 } 328 postFilteredEntries.add(entry); 329 } 330 return postFilteredEntries; 331 } 332 333 protected ESClient getClient() { 334 if (esClient == null) { 335 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 336 esClient = esa.getClient(); 337 } 338 return esClient; 339 } 340 341 protected String getESIndexName() { 342 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 343 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 344 } 345 346 protected void logSearchRequest(SearchRequest request) { 347 if (log.isDebugEnabled()) { 348 log.debug(String.format( 349 "Elasticsearch search request: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 350 getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString())); 351 } 352 } 353 354 protected void logSearchResponse(SearchResponse response) { 355 if (log.isDebugEnabled()) { 356 log.debug("Elasticsearch search response: " + response.toString()); 357 } 358 } 359}