001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mariana Cedica <mcedica@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020package org.nuxeo.drive.elasticsearch; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.elasticsearch.action.search.SearchRequestBuilder; 031import org.elasticsearch.action.search.SearchResponse; 032import org.elasticsearch.action.search.SearchType; 033import org.elasticsearch.client.Client; 034import org.elasticsearch.index.query.BoolQueryBuilder; 035import org.elasticsearch.index.query.QueryBuilder; 036import org.elasticsearch.index.query.QueryBuilders; 037import org.elasticsearch.index.query.RangeQueryBuilder; 038import org.elasticsearch.index.query.TermsQueryBuilder; 039import org.elasticsearch.search.SearchHit; 040import org.elasticsearch.search.SearchHits; 041import org.elasticsearch.search.sort.SortOrder; 042import org.nuxeo.drive.service.SynchronizationRoots; 043import org.nuxeo.drive.service.impl.AuditChangeFinder; 044import org.nuxeo.ecm.core.api.CoreSession; 045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 046import org.nuxeo.ecm.platform.audit.api.LogEntry; 047import org.nuxeo.elasticsearch.ElasticSearchConstants; 048import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 049import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 050import org.nuxeo.runtime.api.Framework; 051 052/** 053 * Override the JPA audit based change finder to execute query in ES. 054 * <p> 055 * The structure of the query executed by the {@link AuditChangeFinder} is: 056 * 057 * <pre> 058 * from LogEntry log where log.repositoryId = :repositoryId 059 * 060 * + AND if ActiveRoots (activeRoots) NOT empty 061 * 062 * from LogEntry log where log.repositoryId = :repositoryId and ( 063 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 064 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 065 * 066 * 067 * if ActiveRoots EMPTY: 068 * 069 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 070 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 071 * 072 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 073 * log.repositoryId asc, log.eventDate desc 074 * </pre> 075 * 076 * @since 7.3 077 */ 078public class ESAuditChangeFinder extends AuditChangeFinder { 079 080 private static final long serialVersionUID = 1L; 081 082 public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class); 083 084 protected Client esClient = null; 085 086 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 087 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 088 int limit) { 089 090 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 091 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 092 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 093 094 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 095 QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 096 upperBound, integerBounds, limit); 097 builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 098 099 builder.addSort("repositoryId", SortOrder.ASC); 100 builder.addSort("eventDate", SortOrder.DESC); 101 102 List<LogEntry> entries = new ArrayList<>(); 103 SearchResponse searchResponse = builder.setSize(limit).execute().actionGet(); 104 for (SearchHit hit : searchResponse.getHits()) { 105 try { 106 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 107 } catch (IOException e) { 108 log.error("Error while reading Audit Entry from ES", e); 109 } 110 } 111 return entries; 112 } 113 114 protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 115 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 116 int limit) { 117 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 118 119 // from LogEntry log where log.repositoryId = :repositoryId 120 QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName()); 121 filterBuilder.must(repositoryClauseFilter); 122 123 if (activeRoots.getPaths().isEmpty()) { 124 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 125 filterBuilder.must(getDriveLogsQueryClause()); 126 } else { 127 128 BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery(); 129 130 // LIST_DOC_EVENTS_IDS_QUERY 131 132 // (log.category = 'eventDocumentCategory' and (log.eventId = 133 // 'documentCreated' or log.eventId = 'documentModified' or 134 // log.eventId = 'documentMoved' or log.eventId = 135 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 136 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId = 137 // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category = 138 // 'eventLifeCycleCategory' and log.eventId = 139 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 140 String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 141 "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked", 142 "documentUnlocked" }; 143 BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery(); 144 orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true)); 145 orEventsFilter.should( 146 getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true)); 147 orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 148 149 // ROOT_PATHS log.docPath like :rootPath1 150 if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) { 151 BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery(); 152 rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths())); 153 rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 154 155 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 156 // COLECTIONS_PATHS) 157 // or (log.category = 'NuxeoDrive' and log.eventId != 158 // 'rootUnregistered') ) 159 orFilterBuilderIfActiveRoots.should( 160 QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter)); 161 } else { 162 orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must( 163 getCurrentRootsClause(activeRoots.getPaths()))); 164 } 165 166 orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause()); 167 168 filterBuilder.must(orFilterBuilderIfActiveRoots); 169 } 170 171 filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound)); 172 return filterBuilder; 173 174 } 175 176 protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 177 RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id"); 178 rangeFilter.gt(lowerBound); 179 rangeFilter.lte(upperBound); 180 return rangeFilter; 181 } 182 183 protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 184 return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds); 185 } 186 187 protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) { 188 BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery(); 189 for (String rootPath : rootPaths) { 190 orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath)); 191 } 192 return orFilterRoots; 193 } 194 195 protected BoolQueryBuilder getDriveLogsQueryClause() { 196 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 197 filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive")); 198 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", "rootUnregistered")); 199 return filterBuilder; 200 } 201 202 protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 203 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 204 filterBuilder.must(QueryBuilders.termQuery("category", category)); 205 if (eventIds != null && eventIds.length > 0) { 206 if (eventIds.length == 1) { 207 if (shouldMatch) { 208 filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0])); 209 } else { 210 filterBuilder.mustNot(QueryBuilders.termQuery("eventId", eventIds[0])); 211 } 212 } else { 213 if (shouldMatch) { 214 filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds)); 215 } else { 216 filterBuilder.mustNot(QueryBuilders.termsQuery("eventId", eventIds)); 217 } 218 } 219 } 220 return filterBuilder; 221 } 222 223 @Override 224 public long getUpperBound() { 225 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 226 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 227 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 228 // TODO refactor this to use max clause 229 builder.setQuery(QueryBuilders.matchAllQuery()); 230 builder.addSort("id", SortOrder.DESC); 231 builder.setSize(1); 232 SearchResponse searchResponse = builder.execute().actionGet(); 233 List<LogEntry> entries = new ArrayList<>(); 234 SearchHits hits = searchResponse.getHits(); 235 for (SearchHit hit : hits) { 236 try { 237 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 238 } catch (IOException e) { 239 log.error("Error while reading Audit Entry from ES", e); 240 } 241 } 242 return entries.size() > 0 ? entries.get(0).getId() : -1; 243 } 244 245 /** 246 * Returns the last available log id in the audit index considering events older than the last clustering 247 * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the 248 * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh. 249 */ 250 @Override 251 public long getUpperBound(Set<String> repositoryNames) { 252 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 253 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 254 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 255 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 256 long clusteringDelay = getClusteringDelay(repositoryNames); 257 if (clusteringDelay > -1) { 258 long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay; 259 RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate") 260 .lt(new Date(lastClusteringInvalidationDate)); 261 builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 262 } else { 263 builder.setQuery(queryBuilder); 264 } 265 builder.addSort("id", SortOrder.DESC); 266 builder.setSize(1); 267 SearchResponse searchResponse = builder.execute().actionGet(); 268 List<LogEntry> entries = new ArrayList<>(); 269 SearchHits hits = searchResponse.getHits(); 270 for (SearchHit hit : hits) { 271 try { 272 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 273 } catch (IOException e) { 274 log.error("Error while reading Audit Entry from ES", e); 275 } 276 } 277 if (entries.isEmpty()) { 278 if (clusteringDelay > -1) { 279 // Check for existing entries without the clustering invalidation date filter to not return -1 in this 280 // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 281 // 0 282 builder.setQuery(queryBuilder); 283 searchResponse = builder.execute().actionGet(); 284 if (searchResponse.getHits().iterator().hasNext()) { 285 log.debug("Found no audit log entries matching the criterias but some exist, returning 0"); 286 return 0; 287 } 288 } 289 log.debug("Found no audit log entries, returning -1"); 290 return -1; 291 } 292 return entries.get(0).getId(); 293 } 294 295 @Override 296 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 297 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, 298 int limit) { 299 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 300 upperBound, integerBounds, limit); 301 // Post filter the output to remove (un)registration that are unrelated 302 // to the current user. 303 // TODO move this to the ES query 304 List<LogEntry> postFilteredEntries = new ArrayList<>(); 305 String principalName = session.getPrincipal().getName(); 306 for (LogEntry entry : entries) { 307 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 308 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 309 // ignore event that only impact other users 310 continue; 311 } 312 if (log.isDebugEnabled()) { 313 if (log.isDebugEnabled()) { 314 log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s", 315 entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(), 316 entry.getDocPath())); 317 } 318 } 319 postFilteredEntries.add(entry); 320 } 321 return postFilteredEntries; 322 } 323 324 protected Client getClient() { 325 if (esClient == null) { 326 log.info("Activate Elasticsearch backend for Audit"); 327 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 328 esClient = esa.getClient(); 329 } 330 return esClient; 331 } 332 333 protected String getESIndexName() { 334 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 335 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 336 } 337}