001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Mariana Cedica <mcedica@nuxeo.com> 016 * Antoine Taillefer <ataillefer@nuxeo.com> 017 */ 018package org.nuxeo.drive.elasticsearch; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Set; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.elasticsearch.action.search.SearchRequestBuilder; 028import org.elasticsearch.action.search.SearchResponse; 029import org.elasticsearch.action.search.SearchType; 030import org.elasticsearch.client.Client; 031import org.elasticsearch.index.query.AndFilterBuilder; 032import org.elasticsearch.index.query.BoolFilterBuilder; 033import org.elasticsearch.index.query.FilterBuilder; 034import org.elasticsearch.index.query.FilterBuilders; 035import org.elasticsearch.index.query.OrFilterBuilder; 036import org.elasticsearch.index.query.QueryBuilder; 037import org.elasticsearch.index.query.QueryBuilders; 038import org.elasticsearch.index.query.RangeFilterBuilder; 039import org.elasticsearch.index.query.TermsFilterBuilder; 040import org.elasticsearch.search.SearchHit; 041import org.elasticsearch.search.SearchHits; 042import org.elasticsearch.search.sort.SortOrder; 043import org.nuxeo.drive.service.SynchronizationRoots; 044import org.nuxeo.drive.service.impl.AuditChangeFinder; 045import org.nuxeo.ecm.core.api.CoreSession; 046import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 047import org.nuxeo.ecm.platform.audit.api.LogEntry; 048import org.nuxeo.elasticsearch.ElasticSearchConstants; 049import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 050import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 051import org.nuxeo.runtime.api.Framework; 052 053/** 054 * Override the JPA audit based change finder to execute query in ES. 055 * <p> 056 * The structure of the query executed by the {@link AuditChangeFinder} is: 057 * 058 * <pre> 059 * from LogEntry log where log.repositoryId = :repositoryId 060 * 061 * + AND if ActiveRoots (activeRoots) NOT empty 062 * 063 * from LogEntry log where log.repositoryId = :repositoryId and ( 064 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 065 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 066 * 067 * 068 * if ActiveRoots EMPTY: 069 * 070 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 071 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 072 * 073 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 074 * log.repositoryId asc, log.eventDate desc 075 * </pre> 076 * 077 * @since 7.3 078 */ 079public class ESAuditChangeFinder extends AuditChangeFinder { 080 081 private static final long serialVersionUID = 1L; 082 083 public static final Log log = LogFactory.getLog(ESAuditChangeFinder.class); 084 085 protected Client esClient = null; 086 087 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 088 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 089 090 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()).setTypes( 091 ElasticSearchConstants.ENTRY_TYPE).setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 092 093 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 094 FilterBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 095 upperBound, integerBounds, limit); 096 builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder)); 097 098 builder.addSort("repositoryId", SortOrder.ASC); 099 builder.addSort("eventDate", SortOrder.DESC); 100 101 List<LogEntry> entries = new ArrayList<>(); 102 SearchResponse searchResponse = builder.setFrom(0).setSize(limit).execute().actionGet(); 103 for (SearchHit hit : searchResponse.getHits()) { 104 try { 105 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 106 } catch (IOException e) { 107 log.error("Error while reading Audit Entry from ES", e); 108 } 109 } 110 return entries; 111 } 112 113 protected FilterBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 114 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 115 AndFilterBuilder filterBuilder = FilterBuilders.andFilter(); 116 117 // from LogEntry log where log.repositoryId = :repositoryId 118 FilterBuilder repositoryClauseFilter = FilterBuilders.termFilter("repositoryId", session.getRepositoryName()); 119 filterBuilder.add(repositoryClauseFilter); 120 121 if (activeRoots.getPaths().isEmpty()) { 122 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 123 filterBuilder.add(getDriveLogsQueryClause()); 124 } else { 125 126 OrFilterBuilder orFilterBuilderIfActiveRoots = FilterBuilders.orFilter(); 127 128 // LIST_DOC_EVENTS_IDS_QUERY 129 130 // (log.category = 'eventDocumentCategory' and (log.eventId = 131 // 'documentCreated' or log.eventId = 'documentModified' or 132 // log.eventId = 'documentMoved' or log.eventId = 133 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 134 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’) or log.category = 135 // 'eventLifeCycleCategory' and log.eventId = 136 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 137 String eventIds[] = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 138 "documentRestored", "addedToCollection", "documentProxyPublished" }; 139 OrFilterBuilder orEventsFilter = FilterBuilders.orFilter(); 140 orEventsFilter.add(getEventsClause("eventDocumentCategory", eventIds, true)); 141 orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, 142 true)); 143 orEventsFilter.add(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 144 145 // ROOT_PATHS log.docPath like :rootPath1 146 if (collectionSyncRootMemberIds != null && collectionSyncRootMemberIds.size() > 0) { 147 OrFilterBuilder rootsOrCollectionsFilter = FilterBuilders.orFilter(); 148 rootsOrCollectionsFilter.add(getCurrentRootsClause(activeRoots.getPaths())); 149 rootsOrCollectionsFilter.add(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 150 151 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 152 // COLECTIONS_PATHS) 153 // or (log.category = 'NuxeoDrive' and log.eventId != 154 // 'rootUnregistered') ) 155 orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter, rootsOrCollectionsFilter)); 156 } else { 157 orFilterBuilderIfActiveRoots.add(FilterBuilders.andFilter(orEventsFilter, 158 getCurrentRootsClause(activeRoots.getPaths()))); 159 } 160 161 orFilterBuilderIfActiveRoots.add(getDriveLogsQueryClause()); 162 163 filterBuilder.add(orFilterBuilderIfActiveRoots); 164 } 165 166 filterBuilder.add(getLogIdBoundsClause(lowerBound, upperBound)); 167 return filterBuilder; 168 169 } 170 171 protected RangeFilterBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 172 RangeFilterBuilder rangeFilter = FilterBuilders.rangeFilter("id"); 173 rangeFilter.gt(lowerBound); 174 rangeFilter.lte(upperBound); 175 return rangeFilter; 176 } 177 178 protected TermsFilterBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 179 return FilterBuilders.termsFilter("docUUID", collectionSyncRootMemberIds); 180 } 181 182 protected OrFilterBuilder getCurrentRootsClause(Set<String> rootPaths) { 183 OrFilterBuilder orFilterRoots = FilterBuilders.orFilter(); 184 for (String rootPath : rootPaths) { 185 orFilterRoots.add(FilterBuilders.prefixFilter("docPath", rootPath)); 186 } 187 return orFilterRoots; 188 } 189 190 protected BoolFilterBuilder getDriveLogsQueryClause() { 191 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 192 filterBuilder.must(FilterBuilders.termFilter("category", "NuxeoDrive")); 193 filterBuilder.mustNot(FilterBuilders.termFilter("eventId", "rootUnregistered")); 194 return filterBuilder; 195 } 196 197 protected BoolFilterBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 198 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 199 filterBuilder.must(FilterBuilders.termFilter("category", category)); 200 if (eventIds != null && eventIds.length > 0) { 201 if (eventIds.length == 1) { 202 if (shouldMatch) { 203 filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0])); 204 } else { 205 filterBuilder.mustNot(FilterBuilders.termFilter("eventId", eventIds[0])); 206 } 207 } else { 208 if (shouldMatch) { 209 filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds)); 210 } else { 211 filterBuilder.mustNot(FilterBuilders.termsFilter("eventId", eventIds)); 212 } 213 } 214 } 215 return filterBuilder; 216 } 217 218 @Override 219 public long getUpperBound() { 220 SearchRequestBuilder builder = getClient().prepareSearch(getESIndexName()).setTypes( 221 ElasticSearchConstants.ENTRY_TYPE).setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 222 // TODO refactor this to use max clause 223 builder.setQuery(QueryBuilders.matchAllQuery()); 224 builder.addSort("id", SortOrder.DESC); 225 builder.setFrom(0); 226 builder.setSize(1); 227 SearchResponse searchResponse = builder.execute().actionGet(); 228 List<LogEntry> entries = new ArrayList<>(); 229 SearchHits hits = searchResponse.getHits(); 230 for (SearchHit hit : hits) { 231 try { 232 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 233 } catch (IOException e) { 234 log.error("Error while reading Audit Entry from ES", e); 235 } 236 } 237 return entries.size() > 0 ? entries.get(0).getId() : -1; 238 } 239 240 @Override 241 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 242 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, boolean integerBounds, int limit) { 243 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 244 upperBound, integerBounds, limit); 245 // Post filter the output to remove (un)registration that are unrelated 246 // to the current user. 247 // TODO move this to the ES query 248 List<LogEntry> postFilteredEntries = new ArrayList<LogEntry>(); 249 String principalName = session.getPrincipal().getName(); 250 for (LogEntry entry : entries) { 251 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 252 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 253 // ignore event that only impact other users 254 continue; 255 } 256 if (log.isDebugEnabled()) { 257 if (log.isDebugEnabled()) { 258 log.debug(String.format("Change with eventId=%d detected at eventDate=%s, logDate=%s: %s on %s", 259 entry.getId(), entry.getEventDate(), entry.getLogDate(), entry.getEventId(), 260 entry.getDocPath())); 261 } 262 } 263 postFilteredEntries.add(entry); 264 } 265 return postFilteredEntries; 266 } 267 268 protected Client getClient() { 269 if (esClient == null) { 270 log.info("Activate Elasticsearch backend for Audit"); 271 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 272 esClient = esa.getClient(); 273 } 274 return esClient; 275 } 276 277 protected String getESIndexName() { 278 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 279 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 280 } 281}