001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mariana Cedica <mcedica@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020package org.nuxeo.drive.elasticsearch; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.action.search.SearchRequestBuilder; 031import org.elasticsearch.action.search.SearchResponse; 032import org.elasticsearch.action.search.SearchType; 033import org.elasticsearch.client.Client; 034import org.elasticsearch.index.query.BoolQueryBuilder; 035import org.elasticsearch.index.query.QueryBuilder; 036import org.elasticsearch.index.query.QueryBuilders; 037import org.elasticsearch.index.query.RangeQueryBuilder; 038import org.elasticsearch.index.query.TermsQueryBuilder; 039import org.elasticsearch.search.SearchHit; 040import org.elasticsearch.search.SearchHits; 041import org.elasticsearch.search.sort.SortOrder; 042import org.nuxeo.drive.service.SynchronizationRoots; 043import org.nuxeo.drive.service.impl.AuditChangeFinder; 044import org.nuxeo.ecm.core.api.CoreSession; 045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 046import org.nuxeo.ecm.platform.audit.api.LogEntry; 047import org.nuxeo.elasticsearch.ElasticSearchConstants; 048import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 049import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 050import org.nuxeo.runtime.api.Framework; 051 052/** 053 * Override the JPA audit based change finder to execute query in ES. 054 * <p> 055 * The structure of the query executed by the {@link AuditChangeFinder} is: 056 * 057 * <pre> 058 * from LogEntry log where log.repositoryId = :repositoryId 059 * 060 * + AND if ActiveRoots (activeRoots) NOT empty 061 * 062 * from LogEntry log where log.repositoryId = :repositoryId and ( 063 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 064 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 065 * 066 * 067 * if ActiveRoots EMPTY: 068 * 069 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 070 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 071 * 072 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 073 * log.repositoryId asc, log.eventDate desc 074 * </pre> 075 * 076 * @since 7.3 077 */ 078public class ESAuditChangeFinder extends AuditChangeFinder { 079 080 private static final long serialVersionUID = 1L; 081 082 public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class); 083 084 protected Client esClient = null; 085 086 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 087 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 088 int limit) { 089 090 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 091 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 092 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 093 094 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 095 QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 096 upperBound, integerBounds, limit); 097 builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 098 099 builder.addSort("repositoryId", SortOrder.ASC); 100 builder.addSort("eventDate", SortOrder.DESC); 101 102 List<LogEntry> entries = new ArrayList<>(); 103 logSearchRequest(builder); 104 SearchResponse searchResponse = builder.setSize(limit).execute().actionGet(); 105 logSearchResponse(searchResponse); 106 for (SearchHit hit : searchResponse.getHits()) { 107 try { 108 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 109 } catch (IOException e) { 110 log.error("Error while reading Audit Entry from ES", e); 111 } 112 } 113 return entries; 114 } 115 116 protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 117 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 118 int limit) { 119 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 120 121 // from LogEntry log where log.repositoryId = :repositoryId 122 QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName()); 123 filterBuilder.must(repositoryClauseFilter); 124 125 if (activeRoots.getPaths().isEmpty()) { 126 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 127 filterBuilder.must(getDriveLogsQueryClause()); 128 } else { 129 130 BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery(); 131 132 // LIST_DOC_EVENTS_IDS_QUERY 133 134 // (log.category = 'eventDocumentCategory' and (log.eventId = 135 // 'documentCreated' or log.eventId = 'documentModified' or 136 // log.eventId = 'documentMoved' or log.eventId = 137 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 138 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId = 139 // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category = 140 // 'eventLifeCycleCategory' and log.eventId = 141 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 142 String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 143 "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked", 144 "documentUnlocked" }; 145 BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery(); 146 orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true)); 147 orEventsFilter.should( 148 getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true)); 149 orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 150 151 // ROOT_PATHS log.docPath like :rootPath1 152 if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) { 153 BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery(); 154 rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths())); 155 rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 156 157 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 158 // COLECTIONS_PATHS) 159 // or (log.category = 'NuxeoDrive' and log.eventId != 160 // 'rootUnregistered') ) 161 orFilterBuilderIfActiveRoots.should( 162 QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter)); 163 } else { 164 orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must( 165 getCurrentRootsClause(activeRoots.getPaths()))); 166 } 167 168 orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause()); 169 170 filterBuilder.must(orFilterBuilderIfActiveRoots); 171 } 172 173 filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound)); 174 return filterBuilder; 175 176 } 177 178 protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 179 RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id"); 180 rangeFilter.gt(lowerBound); 181 rangeFilter.lte(upperBound); 182 return rangeFilter; 183 } 184 185 protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 186 return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds); 187 } 188 189 protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) { 190 BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery(); 191 for (String rootPath : rootPaths) { 192 orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath)); 193 } 194 return orFilterRoots; 195 } 196 197 protected BoolQueryBuilder getDriveLogsQueryClause() { 198 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 199 filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive")); 200 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered")); 201 return filterBuilder; 202 } 203 204 protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 205 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 206 filterBuilder.must(QueryBuilders.termQuery("category", category)); 207 if (eventIds != null && eventIds.length > 0) { 208 if (eventIds.length == 1) { 209 if (shouldMatch) { 210 filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0])); 211 } else { 212 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0])); 213 } 214 } else { 215 if (shouldMatch) { 216 filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds)); 217 } else { 218 filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds)); 219 } 220 } 221 } 222 return filterBuilder; 223 } 224 225 @Override 226 public long getUpperBound() { 227 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 228 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 229 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 230 // TODO refactor this to use max clause 231 builder.setQuery(QueryBuilders.matchAllQuery()); 232 builder.addSort("id", SortOrder.DESC); 233 builder.setSize(1); 234 logSearchRequest(builder); 235 SearchResponse searchResponse = builder.execute().actionGet(); 236 logSearchResponse(searchResponse); 237 List<LogEntry> entries = new ArrayList<>(); 238 SearchHits hits = searchResponse.getHits(); 239 for (SearchHit hit : hits) { 240 try { 241 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 242 } catch (IOException e) { 243 log.error("Error while reading Audit Entry from ES", e); 244 } 245 } 246 return entries.size() > 0 ? entries.get(0).getId() : -1; 247 } 248 249 /** 250 * Returns the last available log id in the audit index considering events older than the last clustering 251 * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the 252 * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh. 253 */ 254 @Override 255 public long getUpperBound(Set<String> repositoryNames) { 256 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 257 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 258 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 259 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 260 long clusteringDelay = getClusteringDelay(repositoryNames); 261 if (clusteringDelay > -1) { 262 long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay; 263 RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate") 264 .lt(new Date(lastClusteringInvalidationDate)); 265 builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 266 } else { 267 builder.setQuery(queryBuilder); 268 } 269 builder.addSort("id", SortOrder.DESC); 270 builder.setSize(1); 271 logSearchRequest(builder); 272 SearchResponse searchResponse = builder.execute().actionGet(); 273 logSearchResponse(searchResponse); 274 List<LogEntry> entries = new ArrayList<>(); 275 SearchHits hits = searchResponse.getHits(); 276 for (SearchHit hit : hits) { 277 try { 278 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 279 } catch (IOException e) { 280 log.error("Error while reading Audit Entry from ES", e); 281 } 282 } 283 if (entries.isEmpty()) { 284 if (clusteringDelay > -1) { 285 // Check for existing entries without the clustering invalidation date filter to not return -1 in this 286 // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 287 // 0 288 builder.setQuery(queryBuilder); 289 logSearchRequest(builder); 290 searchResponse = builder.execute().actionGet(); 291 logSearchResponse(searchResponse); 292 if (searchResponse.getHits().iterator().hasNext()) { 293 log.debug("Found no audit log entries matching the criterias but some exist, returning 0"); 294 return 0; 295 } 296 } 297 log.debug("Found no audit log entries, returning -1"); 298 return -1; 299 } 300 return entries.get(0).getId(); 301 } 302 303 @Override 304 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 305 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 306 int limit) { 307 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 308 upperBound, integerBounds, limit); 309 // Post filter the output to remove (un)registration that are unrelated 310 // to the current user. 311 // TODO move this to the ES query 312 List<LogEntry> postFilteredEntries = new ArrayList<>(); 313 String principalName = session.getPrincipal().getName(); 314 for (LogEntry entry : entries) { 315 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 316 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 317 // ignore event that only impact other users 318 continue; 319 } 320 if (log.isDebugEnabled()) { 321 if (log.isDebugEnabled()) { 322 log.debug(String.format("Change detected: %s", entry)); 323 } 324 } 325 postFilteredEntries.add(entry); 326 } 327 return postFilteredEntries; 328 } 329 330 protected Client getClient() { 331 if (esClient == null) { 332 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 333 esClient = esa.getClient(); 334 } 335 return esClient; 336 } 337 338 protected String getESIndexName() { 339 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 340 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 341 } 342 343 protected void logSearchRequest(SearchRequestBuilder request) { 344 if (log.isDebugEnabled()) { 345 log.debug(String.format( 346 "Elasticsearch search request: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 347 getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString())); 348 } 349 } 350 351 protected void logSearchResponse(SearchResponse response) { 352 if (log.isDebugEnabled()) { 353 log.debug("Elasticsearch search response: " + response.toString()); 354 } 355 } 356}