001/* 002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 */ 019package org.nuxeo.ecm.platform.audit.service; 020 021import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID; 022import static org.nuxeo.ecm.platform.audit.service.LogEntryProvider.createProvider; 023 024import java.io.IOException; 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.function.Consumer; 034import java.util.function.Function; 035 036import org.nuxeo.ecm.core.api.CursorResult; 037import org.nuxeo.ecm.core.api.CursorService; 038import org.nuxeo.ecm.core.api.NuxeoException; 039import org.nuxeo.ecm.core.api.ScrollResult; 040import org.nuxeo.ecm.core.persistence.PersistenceProvider; 041import org.nuxeo.ecm.core.persistence.PersistenceProviderFactory; 042import org.nuxeo.ecm.core.query.sql.model.OrderByExpr; 043import org.nuxeo.ecm.core.query.sql.model.OrderByExprs; 044import org.nuxeo.ecm.core.query.sql.model.QueryBuilder; 045import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 046import org.nuxeo.ecm.platform.audit.api.FilterMapEntry; 047import org.nuxeo.ecm.platform.audit.api.LogEntry; 048import org.nuxeo.ecm.platform.audit.impl.ExtendedInfoImpl; 049import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 050import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor; 051import org.nuxeo.runtime.api.Framework; 052import org.nuxeo.runtime.model.DefaultComponent; 053import org.nuxeo.runtime.transaction.TransactionHelper; 054 055import com.fasterxml.jackson.databind.ObjectMapper; 056 057/** 058 * Contains the Hibernate based (legacy) implementation 059 * 060 * @author tiry 061 */ 062public class DefaultAuditBackend extends AbstractAuditBackend { 063 064 protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); 065 066 protected PersistenceProvider persistenceProvider; 067 068 protected CursorService<Iterator<LogEntry>, LogEntry, String> cursorService; 069 070 public DefaultAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) { 071 super(component, config); 072 activatePersistenceProvider(); 073 } 074 075 /** 076 * @since 9.3 077 */ 078 public DefaultAuditBackend() { 079 super(); 080 } 081 082 @Override 083 public int getApplicationStartedOrder() { 084 DefaultComponent component = (DefaultComponent) Framework.getRuntime().getComponent( 085 "org.nuxeo.ecm.core.persistence.PersistenceComponent"); 086 return component.getApplicationStartedOrder() + 1; 087 } 088 089 @Override 090 public void onApplicationStarted() { 091 activatePersistenceProvider(); 092 cursorService = new CursorService<>(entry -> { 093 try { 094 return OBJECT_MAPPER.writeValueAsString(entry); 095 } catch (IOException e) { 096 throw new NuxeoException("Unable to serialize entry"); 097 } 098 }); 099 } 100 101 @Override 102 public void onApplicationStopped() { 103 try { 104 persistenceProvider.closePersistenceUnit(); 105 } finally { 106 persistenceProvider = null; 107 } 108 } 109 110 // public for testing purpose ! 111 public PersistenceProvider getOrCreatePersistenceProvider() { 112 if (persistenceProvider == null) { 113 activatePersistenceProvider(); 114 } 115 return persistenceProvider; 116 } 117 118 protected void activatePersistenceProvider() { 119 Thread thread = Thread.currentThread(); 120 ClassLoader last = thread.getContextClassLoader(); 121 try { 122 thread.setContextClassLoader(PersistenceProvider.class.getClassLoader()); 123 PersistenceProviderFactory persistenceProviderFactory = Framework.getService( 124 PersistenceProviderFactory.class); 125 persistenceProvider = persistenceProviderFactory.newProvider("nxaudit-logs"); 126 persistenceProvider.openPersistenceUnit(); 127 } finally { 128 thread.setContextClassLoader(last); 129 } 130 } 131 132 protected <T> T apply(boolean needActivateSession, Function<LogEntryProvider, T> function) { 133 return getOrCreatePersistenceProvider().run(Boolean.valueOf(needActivateSession), em -> { 134 return function.apply(createProvider(em)); 135 }); 136 } 137 138 protected void accept(boolean needActivateSession, Consumer<LogEntryProvider> consumer) { 139 getOrCreatePersistenceProvider().run(Boolean.valueOf(needActivateSession), em -> { 140 consumer.accept(createProvider(em)); 141 }); 142 } 143 144 @Override 145 public void addLogEntries(final List<LogEntry> entries) { 146 if (entries.isEmpty()) { 147 return; 148 } 149 TransactionHelper.runInTransaction(() -> accept(true, provider -> provider.addLogEntries(entries))); 150 } 151 152 @Override 153 public List<LogEntry> getLogEntriesFor(final String uuid, final String repositoryId) { 154 return apply(false, provider -> provider.getLogEntriesFor(uuid, repositoryId)); 155 } 156 157 @Override 158 public List<LogEntry> getLogEntriesFor(final String uuid) { 159 return apply(false, provider -> provider.getLogEntriesFor(uuid)); 160 } 161 162 @Override 163 public List<LogEntry> getLogEntriesFor(final String uuid, final Map<String, FilterMapEntry> filterMap, 164 final boolean doDefaultSort) { 165 return apply(false, provider -> provider.getLogEntriesFor(uuid, filterMap, doDefaultSort)); 166 } 167 168 @Override 169 public LogEntry getLogEntryByID(final long id) { 170 return apply(false, provider -> provider.getLogEntryByID(id)); 171 } 172 173 @Override 174 public List<LogEntry> nativeQueryLogs(final String whereClause, final int pageNb, final int pageSize) { 175 return apply(false, provider -> provider.nativeQueryLogs(whereClause, pageNb, pageSize)); 176 } 177 178 @Override 179 public List<?> nativeQuery(final String query, final int pageNb, final int pageSize) { 180 return apply(false, provider -> provider.nativeQuery(query, pageNb, pageSize)); 181 } 182 183 @Override 184 public List<?> nativeQuery(final String query, final Map<String, Object> params, final int pageNb, 185 final int pageSize) { 186 return apply(false, provider -> provider.nativeQuery(query, params, pageNb, pageSize)); 187 } 188 189 @Override 190 public List<LogEntry> queryLogs(QueryBuilder builder) { 191 return apply(false, provider -> provider.queryLogs(builder)); 192 } 193 194 @Override 195 public List<LogEntry> queryLogs(final String[] eventIds, final String dateRange) { 196 return apply(false, provider -> provider.queryLogs(eventIds, dateRange)); 197 } 198 199 @Override 200 public List<LogEntry> queryLogsByPage(final String[] eventIds, final Date limit, final String[] category, 201 final String path, final int pageNb, final int pageSize) { 202 return apply(false, provider -> provider.queryLogsByPage(eventIds, limit, category, path, pageNb, pageSize)); 203 } 204 205 @Override 206 public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) { 207 return apply(false, provider -> syncLogCreationEntries(provider, repoId, path, recurs)); 208 } 209 210 @Override 211 public Long getEventsCount(final String eventId) { 212 return apply(false, provider -> provider.countEventsById(eventId)); 213 } 214 215 public List<String> getLoggedEventIds() { 216 return apply(false, LogEntryProvider::findEventIds); 217 } 218 219 @Override 220 public ExtendedInfo newExtendedInfo(Serializable value) { 221 return ExtendedInfoImpl.createExtendedInfo(value); 222 } 223 224 @Override 225 public long getLatestLogId(String repositoryId, String... eventIds) { 226 Map<String, Object> params = getParams(eventIds); 227 String paramNames = getParamNames(eventIds); 228 params.put("repoId", repositoryId); 229 String query = String.format("FROM LogEntry log" // 230 + " WHERE log.eventId IN (%s)" // 231 + " AND log.repositoryId = :repoId" // 232 + " ORDER BY log.id DESC", paramNames); 233 @SuppressWarnings("unchecked") 234 List<LogEntry> entries = (List<LogEntry>) nativeQuery(query, params, 1, 1); 235 return entries.isEmpty() ? 0 : entries.get(0).getId(); 236 } 237 238 @Override 239 @SuppressWarnings("unchecked") 240 public List<LogEntry> getLogEntriesAfter(long logIdOffset, int limit, String repositoryId, String... eventIds) { 241 Map<String, Object> params = getParams(eventIds); 242 String paramNames = getParamNames(eventIds); 243 params.put("repoId", repositoryId); 244 params.put("minId", Long.valueOf(logIdOffset)); 245 String query = String.format("FROM LogEntry log" // 246 + " WHERE log.id >= :minId" // 247 + " AND log.eventId IN (%s)" // 248 + " AND log.repositoryId = :repoId" // 249 + " ORDER BY log.id", paramNames); 250 return (List<LogEntry>) nativeQuery(query, params, 1, limit); 251 } 252 253 protected String getParamNames(String[] eventId) { 254 List<String> ret = new ArrayList<>(eventId.length); 255 for (String event : eventId) { 256 ret.add(":ev" + event); 257 } 258 return String.join(",", ret); 259 } 260 261 protected Map<String, Object> getParams(String[] eventId) { 262 HashMap<String, Object> ret = new HashMap<>(eventId.length); 263 for (String event : eventId) { 264 ret.put("ev" + event, event); 265 } 266 return ret; 267 } 268 269 @Override 270 public void append(List<String> jsonEntries) { 271 List<LogEntry> entries = new ArrayList<>(); 272 for (String json : jsonEntries) { 273 try { 274 LogEntryImpl entry = OBJECT_MAPPER.readValue(json, LogEntryImpl.class); 275 if (entry.getId() == 0) { 276 throw new NuxeoException("A json entry has an empty id. entry=" + json); 277 } 278 entries.add(entry); 279 } catch (IOException e) { 280 throw new NuxeoException("Unable to deserialize json entries", e); 281 } 282 } 283 accept(false, provider -> provider.append(entries)); 284 } 285 286 @Override 287 public ScrollResult<String> scroll(QueryBuilder builder, int batchSize, int keepAliveSeconds) { 288 // as we're using pages to scroll audit, we need to add an order to make results across pages deterministic 289 builder.orders(OrderByExprs.asc(LOG_ID), builder.orders().toArray(new OrderByExpr[0])); 290 String scrollId = cursorService.registerCursorResult( 291 new SQLAuditCursorResult(builder, batchSize, keepAliveSeconds)); 292 return scroll(scrollId); 293 } 294 295 @Override 296 public ScrollResult<String> scroll(String scrollId) { 297 return cursorService.scroll(scrollId); 298 } 299 300 public class SQLAuditCursorResult extends CursorResult<Iterator<LogEntry>, LogEntry> { 301 302 protected final QueryBuilder builder; 303 304 protected long pageNb; 305 306 protected boolean end; 307 308 public SQLAuditCursorResult(QueryBuilder builder, int batchSize, int keepAliveSeconds) { 309 super(Collections.emptyIterator(), batchSize, keepAliveSeconds); 310 this.builder = builder; 311 this.pageNb = 0; 312 } 313 314 @Override 315 public boolean hasNext() { 316 if (cursor == null || end) { 317 return false; 318 } else if (cursor.hasNext()) { 319 return true; 320 } else { 321 runNextPage(); 322 return !end; 323 } 324 } 325 326 @Override 327 public LogEntry next() { 328 if (cursor != null && !cursor.hasNext() && !end) { 329 // try to run a next scroll 330 runNextPage(); 331 } 332 return super.next(); 333 } 334 335 protected void runNextPage() { 336 builder.offset(pageNb++ * batchSize).limit(batchSize); 337 cursor = queryLogs(builder).iterator(); 338 end = !cursor.hasNext(); 339 } 340 341 @Override 342 public void close() { 343 end = true; 344 // Call super close to clear cursor 345 super.close(); 346 } 347 348 } 349 350}