001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mariana Cedica <mcedica@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020package org.nuxeo.drive.elasticsearch; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Set; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.elasticsearch.action.search.SearchRequestBuilder; 030import org.elasticsearch.action.search.SearchResponse; 031import org.elasticsearch.action.search.SearchType; 032import org.elasticsearch.client.Client; 033import org.elasticsearch.index.query.AndFilterBuilder; 034import org.elasticsearch.index.query.BoolFilterBuilder; 035import org.elasticsearch.index.query.FilterBuilder; 036import org.elasticsearch.index.query.FilterBuilders; 037import org.elasticsearch.index.query.OrFilterBuilder; 038import org.elasticsearch.index.query.QueryBuilder; 039import org.elasticsearch.index.query.QueryBuilders; 040import org.elasticsearch.index.query.RangeFilterBuilder; 041import org.elasticsearch.index.query.TermsFilterBuilder; 042import org.elasticsearch.search.SearchHit; 043import org.elasticsearch.search.SearchHits; 044import org.elasticsearch.search.sort.SortOrder; 045import org.nuxeo.drive.service.SynchronizationRoots; 046import org.nuxeo.drive.service.impl.AuditChangeFinder; 047import org.nuxeo.ecm.core.api.CoreSession; 048import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 049import org.nuxeo.ecm.platform.audit.api.LogEntry; 050import org.nuxeo.elasticsearch.ElasticSearchConstants; 051import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 052import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 053import org.nuxeo.runtime.api.Framework; 054 055/** 056 * Override the JPA audit based change finder to execute query in ES. 057 * <p> 058 * The structure of the query executed by the {@link AuditChangeFinder} is: 059 * 060 * <pre> 061 * from LogEntry log where log.repositoryId = :repositoryId 062 * 063 * + AND if ActiveRoots (activeRoots) NOT empty 064 * 065 * from LogEntry log where log.repositoryId = :repositoryId and ( 066 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 067 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 068 * 069 * 070 * if ActiveRoots EMPTY: 071 * 072 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 073 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 074 * 075 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 076 * log.repositoryId asc, log.eventDate desc 077 * </pre> 078 * 079 * @since 7.3 080 */ 081public class ESAuditChangeFinder extends AuditChangeFinder { 082 083 private static final long serialVersionUID = 1L; 084 085 public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class); 086 087 protected Client esClient = null; 088 089 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 090 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 091 092 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 093 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 094 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 095 096 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 097 FilterBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 098 upperBound, integerBounds, limit); 099 builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder)); 100 101 builder.addSort("repositoryId", SortOrder.ASC); 102 builder.addSort("eventDate", SortOrder.DESC); 103 104 List<LogEntry> entries = new ArrayList<>(); 105 SearchResponse searchResponse = builder.setFrom(0).setSize(limit).execute().actionGet(); 106 for (SearchHit hit : searchResponse.getHits()) { 107 try { 108 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 109 } catch (IOException e) { 110 log.error("Error while reading Audit Entry from ES", e); 111 } 112 } 113 return entries; 114 } 115 116 protected FilterBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 117 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 118 AndFilterBuilder filterBuilder = FilterBuilders.andFilter(); 119 120 // from LogEntry log where log.repositoryId = :repositoryId 121 FilterBuilder repositoryClauseFilter = FilterBuilders.termFilter("repositoryId", session.getRepositoryName()); 122 filterBuilder.add(repositoryClauseFilter); 123 124 if (activeRoots.getPaths().isEmpty()) { 125 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 126 filterBuilder.add(getDriveLogsQueryClause()); 127 } else { 128 129 OrFilterBuilder orFilterBuilderIfActiveRoots = FilterBuilders.orFilter(); 130 131 // LIST_DOC_EVENTS_IDS_QUERY 132 133 // (log.category = 'eventDocumentCategory' and (log.eventId = 134 // 'documentCreated' or log.eventId = 'documentModified' or 135 // log.eventId = 'documentMoved' or log.eventId = 136 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 137 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId = 138 // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category = 139 // 'eventLifeCycleCategory' and log.eventId = 140 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 141 String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 142 "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked", 143 "documentUnlocked" }; 144 OrFilterBuilder orEventsFilter = FilterBuilders.orFilter(); 145 orEventsFilter.add(getEventsClause("eventDocumentCategory", eventIds, true)); 146 orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, 147 true)); 148 orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 149 150 // ROOT_PATHS log.docPath like :rootPath1 151 if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) { 152 OrFilterBuilder rootsOrCollectionsFilter = FilterBuilders.orFilter(); 153 rootsOrCollectionsFilter.add(getCurrentRootsClause(activeRoots.getPaths())); 154 rootsOrCollectionsFilter.add(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 155 156 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 157 // COLECTIONS_PATHS) 158 // or (log.category = 'NuxeoDrive' and log.eventId != 159 // 'rootUnregistered') ) 160 orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter, rootsOrCollectionsFilter)); 161 } else { 162 orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter, 163 getCurrentRootsClause(activeRoots.getPaths()))); 164 } 165 166 orFilterBuilderIfActiveRoots.add(getDriveLogsQueryClause()); 167 168 filterBuilder.add(orFilterBuilderIfActiveRoots); 169 } 170 171 filterBuilder.add(getLogIdBoundsClause(lowerBound, upperBound)); 172 return filterBuilder; 173 174 } 175 176 protected RangeFilterBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 177 RangeFilterBuilder rangeFilter = FilterBuilders.rangeFilter("id"); 178 rangeFilter.gt(lowerBound); 179 rangeFilter.lte(upperBound); 180 return rangeFilter; 181 } 182 183 protected TermsFilterBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 184 return FilterBuilders.termsFilter("docUUID", collectionSyncRootMemberIds); 185 } 186 187 protected OrFilterBuilder getCurrentRootsClause(Set<String> rootPaths) { 188 OrFilterBuilder orFilterRoots = FilterBuilders.orFilter(); 189 for (String rootPath : rootPaths) { 190 orFilterRoots.add(FilterBuilders.prefixFilter("docPath", rootPath)); 191 } 192 return orFilterRoots; 193 } 194 195 protected BoolFilterBuilder getDriveLogsQueryClause() { 196 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 197 filterBuilder.must(FilterBuilders.termFilter("category", "NuxeoDrive")); 198 filterBuilder.mustNot(FilterBuilders.termFilter("eventId", "rootUnregistered")); 199 return filterBuilder; 200 } 201 202 protected BoolFilterBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 203 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 204 filterBuilder.must(FilterBuilders.termFilter("category", category)); 205 if (eventIds != null && eventIds.length > 0) { 206 if (eventIds.length == 1) { 207 if (shouldMatch) { 208 filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0])); 209 } else { 210 filterBuilder.mustNot(FilterBuilders.termFilter("eventId", eventIds[0])); 211 } 212 } else { 213 if (shouldMatch) { 214 filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds)); 215 } else { 216 filterBuilder.mustNot(FilterBuilders.termsFilter("eventId", eventIds)); 217 } 218 } 219 } 220 return filterBuilder; 221 } 222 223 @Override 224 public long getUpperBound() { 225 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()) 226 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 227 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 228 // TODO refactor this to use max clause 229 builder.setQuery(QueryBuilders.matchAllQuery()); 230 builder.addSort("id", SortOrder.DESC); 231 builder.setFrom(0); 232 builder.setSize(1); 233 SearchResponse searchResponse = builder.execute().actionGet(); 234 List<LogEntry> entries = new ArrayList<>(); 235 SearchHits hits = searchResponse.getHits(); 236 for (SearchHit hit : hits) { 237 try { 238 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 239 } catch (IOException e) { 240 log.error("Error while reading Audit Entry from ES", e); 241 } 242 } 243 return entries.size() > 0 ? entries.get(0).getId() : -1; 244 } 245 246 @Override 247 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 248 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 249 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 250 upperBound, integerBounds, limit); 251 // Post filter the output to remove (un)registration that are unrelated 252 // to the current user. 253 // TODO move this to the ES query 254 List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>(); 255 String principalName = session.getPrincipal().getName(); 256 for (LogEntry entry : entries) { 257 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 258 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 259 // ignore event that only impact other users 260 continue; 261 } 262 if (log.isDebugEnabled()) { 263 if (log.isDebugEnabled()) { 264 log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s", 265 entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(), 266 entry.getDocPath())); 267 } 268 } 269 postFilteredEntries.add(entry); 270 } 271 return postFilteredEntries; 272 } 273 274 protected Client getClient() { 275 if (esClient == null) { 276 log.info("Activate Elasticsearch backend for Audit"); 277 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 278 esClient = esa.getClient(); 279 } 280 return esClient; 281 } 282 283 protected String getESIndexName() { 284 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 285 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 286 } 287}