001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.mongodb.audit; 020 021import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_CATEGORY; 022import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_PATH; 023import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_UUID; 024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_DATE; 025import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_ID; 026import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID; 027import static org.nuxeo.runtime.mongodb.MongoDBSerializationHelper.MONGODB_ID; 028 029import java.io.IOException; 030import java.io.Serializable; 031import java.text.SimpleDateFormat; 032import java.util.ArrayList; 033import java.util.Calendar; 034import java.util.Date; 035import java.util.HashMap; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039import java.util.TimeZone; 040import java.util.function.Function; 041import java.util.stream.Collectors; 042import java.util.stream.StreamSupport; 043 044import org.apache.commons.logging.Log; 045import org.apache.commons.logging.LogFactory; 046import org.bson.Document; 047import org.bson.conversions.Bson; 048import org.nuxeo.common.utils.TextTemplate; 049import org.nuxeo.ecm.core.api.CursorService; 050import org.nuxeo.ecm.core.api.NuxeoException; 051import org.nuxeo.ecm.core.api.ScrollResult; 052import org.nuxeo.ecm.core.query.sql.model.Literals; 053import org.nuxeo.ecm.core.query.sql.model.MultiExpression; 054import org.nuxeo.ecm.core.query.sql.model.Operand; 055import org.nuxeo.ecm.core.query.sql.model.Operator; 056import org.nuxeo.ecm.core.query.sql.model.OrderByExpr; 057import org.nuxeo.ecm.core.query.sql.model.OrderByList; 058import org.nuxeo.ecm.core.query.sql.model.Predicate; 059import org.nuxeo.ecm.core.query.sql.model.Reference; 060import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 061import org.nuxeo.ecm.core.uidgen.UIDSequencer; 062import org.nuxeo.ecm.platform.audit.api.AuditQueryBuilder; 063import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 064import org.nuxeo.ecm.platform.audit.api.LogEntry; 065import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 066import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 067import org.nuxeo.ecm.platform.audit.service.AuditBackend; 068import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 069import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService; 070import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor; 071import org.nuxeo.runtime.api.Framework; 072import org.nuxeo.runtime.model.DefaultComponent; 073import org.nuxeo.runtime.mongodb.MongoDBComponent; 074import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 075import org.nuxeo.runtime.services.config.ConfigurationService; 076 077import com.fasterxml.jackson.databind.ObjectMapper; 078import com.mongodb.client.FindIterable; 079import com.mongodb.client.MongoCollection; 080import com.mongodb.client.MongoCursor; 081import com.mongodb.client.MongoDatabase; 082import com.mongodb.client.model.Filters; 083import com.mongodb.client.model.Indexes; 084import com.mongodb.client.model.Sorts; 085import com.mongodb.util.JSON; 086 087/** 088 * Implementation of the {@link AuditBackend} interface using MongoDB persistence. 089 * 090 * @since 9.1 091 */ 092public class MongoDBAuditBackend extends AbstractAuditBackend implements AuditBackend { 093 094 private static final Log log = LogFactory.getLog(MongoDBAuditBackend.class); 095 096 public static final String AUDIT_DATABASE_ID = "audit"; 097 098 public static final String COLLECTION_NAME_PROPERTY = "nuxeo.mongodb.audit.collection.name"; 099 100 public static final String DEFAULT_COLLECTION_NAME = "audit"; 101 102 public static final String SEQ_NAME = "audit"; 103 104 public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); 105 106 protected MongoCollection<Document> collection; 107 108 protected MongoDBLogEntryProvider provider = new MongoDBLogEntryProvider(); 109 110 protected CursorService<MongoCursor<Document>, Document, String> cursorService; 111 112 public MongoDBAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) { 113 super(component, config); 114 } 115 116 /** 117 * @since 9.3 118 */ 119 public MongoDBAuditBackend() { 120 super(); 121 } 122 123 @Override 124 public int getApplicationStartedOrder() { 125 DefaultComponent component = (DefaultComponent) Framework.getRuntime().getComponent(MongoDBComponent.NAME); 126 return component.getApplicationStartedOrder() + 1; 127 } 128 129 @Override 130 public void onApplicationStarted() { 131 log.info("Activate MongoDB backend for Audit"); 132 // First retrieve the collection name 133 ConfigurationService configurationService = Framework.getService(ConfigurationService.class); 134 String collName = configurationService.getProperty(COLLECTION_NAME_PROPERTY, DEFAULT_COLLECTION_NAME); 135 // Get a connection to MongoDB 136 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 137 MongoDatabase database = mongoService.getDatabase(AUDIT_DATABASE_ID); 138 collection = database.getCollection(collName); 139 collection.createIndex(Indexes.ascending(LOG_DOC_UUID)); // query by doc id 140 collection.createIndex(Indexes.ascending(LOG_EVENT_DATE)); // query by date range 141 collection.createIndex(Indexes.ascending(LOG_EVENT_ID)); // query by type of event 142 cursorService = new CursorService<>(doc -> { 143 Object id = doc.remove(MONGODB_ID); 144 if (id != null) { 145 doc.put(LOG_ID, id); 146 } 147 return JSON.serialize(doc); 148 }); 149 } 150 151 @Override 152 public void onApplicationStopped() { 153 collection = null; 154 cursorService.clear(); 155 cursorService = null; 156 } 157 158 /** 159 * @return the {@link MongoCollection} configured with audit settings. 160 */ 161 public MongoCollection<Document> getAuditCollection() { 162 return collection; 163 } 164 165 @Override 166 public List<LogEntry> queryLogs(AuditQueryBuilder builder) { 167 // prepare parameters 168 Predicate predicate = builder.predicate(); 169 OrderByList orders = builder.orders(); 170 long offset = builder.offset(); 171 long limit = builder.limit(); 172 173 // create MongoDB filter 174 Bson mgFilter = createFilter(predicate); 175 176 // create MongoDB order 177 Bson mgOrder = createSort(orders); 178 179 logRequest(mgFilter, mgOrder); 180 FindIterable<Document> iterable = collection.find(mgFilter).sort(mgOrder).skip((int) offset).limit((int) limit); 181 return buildLogEntries(iterable); 182 } 183 184 protected Bson createFilter(Predicate andPredicate) { 185 // cast parameters 186 // current implementation only support a MultiExpression with AND operator 187 @SuppressWarnings("unchecked") 188 List<Predicate> predicates = (List<Predicate>) ((List<?>) ((MultiExpression) andPredicate).values); 189 // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right 190 Function<Operand, String> getFieldName = operand -> ((Reference) operand).name; 191 getFieldName = getFieldName.andThen(this::getMongoDBKey); 192 193 List<Bson> filterList = new ArrayList<>(predicates.size()); 194 for (Predicate predicate : predicates) { 195 String leftName = getFieldName.apply(predicate.lvalue); 196 Operator operator = predicate.operator; 197 Object rightValue = Literals.valueOf(predicate.rvalue); 198 if (Operator.EQ.equals(operator)) { 199 filterList.add(Filters.eq(leftName, rightValue)); 200 } else if (Operator.NOTEQ.equals(operator)) { 201 filterList.add(Filters.ne(leftName, rightValue)); 202 } else if (Operator.LT.equals(operator)) { 203 filterList.add(Filters.lt(leftName, predicate.rvalue)); 204 } else if (Operator.LTEQ.equals(operator)) { 205 filterList.add(Filters.lte(leftName, rightValue)); 206 } else if (Operator.GTEQ.equals(operator)) { 207 filterList.add(Filters.gte(leftName, rightValue)); 208 } else if (Operator.GT.equals(operator)) { 209 filterList.add(Filters.gt(leftName, rightValue)); 210 } else if (Operator.IN.equals(operator)) { 211 filterList.add(Filters.in(leftName, (List<?>) rightValue)); 212 } 213 } 214 return Filters.and(filterList); 215 } 216 217 protected Bson createSort(OrderByList orders) { 218 List<Bson> orderList = new ArrayList<>(orders.size()); 219 for (OrderByExpr order : orders) { 220 String name = getMongoDBKey(order.reference.name); 221 if (order.isDescending) { 222 orderList.add(Sorts.descending(name)); 223 } else { 224 orderList.add(Sorts.ascending(name)); 225 } 226 } 227 return Sorts.orderBy(orderList); 228 } 229 230 protected String getMongoDBKey(String key) { 231 if (LOG_ID.equals(key)) { 232 return MONGODB_ID; 233 } 234 return key; 235 } 236 237 @Override 238 public LogEntry getLogEntryByID(long id) { 239 Document document = collection.find(Filters.eq(MONGODB_ID, Long.valueOf(id))).first(); 240 if (document == null) { 241 return null; 242 } 243 return MongoDBAuditEntryReader.read(document); 244 } 245 246 @Override 247 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 248 Bson filter = buildFilter(query, params); 249 logRequest(filter, pageNb, pageSize); 250 FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize); 251 return buildLogEntries(iterable); 252 } 253 254 public Bson buildFilter(String query, Map<String, Object> params) { 255 if (params != null && params.size() > 0) { 256 query = expandQueryVariables(query, params); 257 } 258 return Document.parse(query); 259 } 260 261 public String expandQueryVariables(String query, Object[] params) { 262 Map<String, Object> qParams = new HashMap<>(); 263 for (int i = 0; i < params.length; i++) { 264 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 265 qParams.put("param" + i, params[i]); 266 } 267 return expandQueryVariables(query, qParams); 268 } 269 270 public String expandQueryVariables(String query, Map<String, Object> params) { 271 if (params != null && params.size() > 0) { 272 TextTemplate tmpl = new TextTemplate(); 273 // MongoDB date formatter - copied from org.bson.json.JsonWriter 274 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\'"); 275 dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); 276 for (Entry<String, Object> entry : params.entrySet()) { 277 String key = entry.getKey(); 278 Object value = entry.getValue(); 279 if (value instanceof Calendar) { 280 tmpl.setVariable(key, dateFormat.format(((Calendar) value).getTime())); 281 } else if (value instanceof Date) { 282 tmpl.setVariable(key, dateFormat.format(value)); 283 } else if (value != null) { 284 tmpl.setVariable(key, value.toString()); 285 } 286 } 287 query = tmpl.processText(query); 288 } 289 return query; 290 } 291 292 @Override 293 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 294 int pageSize) { 295 List<Bson> list = new ArrayList<>(); 296 if (eventIds != null && eventIds.length > 0) { 297 if (eventIds.length == 1) { 298 list.add(Filters.eq(LOG_EVENT_ID, eventIds[0])); 299 } else { 300 list.add(Filters.in(LOG_EVENT_ID, eventIds)); 301 } 302 } 303 if (categories != null && categories.length > 0) { 304 if (categories.length == 1) { 305 list.add(Filters.eq(LOG_CATEGORY, categories[0])); 306 } else { 307 list.add(Filters.in(LOG_CATEGORY, categories)); 308 } 309 } 310 if (path != null) { 311 list.add(Filters.eq(LOG_DOC_PATH, path)); 312 } 313 if (limit != null) { 314 list.add(Filters.lt(LOG_EVENT_DATE, limit)); 315 } 316 Bson filter = list.size() == 1 ? list.get(0) : Filters.and(list); 317 logRequest(filter, pageNb, pageSize); 318 FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize); 319 return buildLogEntries(iterable); 320 } 321 322 @Override 323 public void addLogEntries(List<LogEntry> entries) { 324 if (entries.isEmpty()) { 325 return; 326 } 327 328 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 329 UIDSequencer seq = uidGeneratorService.getSequencer(); 330 331 List<Document> documents = new ArrayList<>(entries.size()); 332 for (LogEntry entry : entries) { 333 entry.setId(seq.getNextLong(SEQ_NAME)); 334 if (log.isDebugEnabled()) { 335 log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ", 336 Long.valueOf(entry.getId()), entry.getLogDate(), entry.getDocUUID())); 337 } 338 documents.add(MongoDBAuditEntryWriter.asDocument(entry)); 339 } 340 collection.insertMany(documents); 341 } 342 343 @Override 344 public Long getEventsCount(String eventId) { 345 return Long.valueOf(collection.count(Filters.eq("eventId", eventId))); 346 } 347 348 @Override 349 public long syncLogCreationEntries(String repoId, String path, Boolean recurs) { 350 return syncLogCreationEntries(provider, repoId, path, recurs); 351 } 352 353 @Override 354 public ExtendedInfo newExtendedInfo(Serializable value) { 355 return new MongoDBExtendedInfo(value); 356 } 357 358 private List<LogEntry> buildLogEntries(FindIterable<Document> iterable) { 359 return StreamSupport.stream(iterable.spliterator(), false) 360 .map(MongoDBAuditEntryReader::read) 361 .collect(Collectors.toList()); 362 } 363 364 private void logRequest(Bson filter, Bson orderBy) { 365 if (log.isDebugEnabled()) { 366 log.debug("MongoDB: FILTER " + filter + (orderBy == null ? "" : " ORDER BY " + orderBy)); 367 } 368 } 369 370 private void logRequest(Bson filter, int pageNb, int pageSize) { 371 if (log.isDebugEnabled()) { 372 log.debug("MongoDB: FILTER " + filter + " OFFSET " + pageNb + " LIMIT " + pageSize); 373 } 374 } 375 376 @Override 377 public void append(List<String> jsonEntries) { 378 // we need to parse json with jackson first because Document#parse from mongodb driver will parse number as int 379 List<Document> entries = new ArrayList<>(); 380 for (String json : jsonEntries) { 381 try { 382 LogEntryImpl entry = OBJECT_MAPPER.readValue(json, LogEntryImpl.class); 383 if (entry.getId() == 0) { 384 throw new NuxeoException("A json entry has an empty id. entry=" + json); 385 } 386 Document doc = MongoDBAuditEntryWriter.asDocument(entry); 387 entries.add(doc); 388 } catch (IOException e) { 389 throw new NuxeoException("Unable to deserialize json entry=" + json, e); 390 } 391 } 392 collection.insertMany(entries); 393 } 394 395 @Override 396 public ScrollResult<String> scroll(AuditQueryBuilder builder, int batchSize, int keepAliveSeconds) { 397 // prepare parameters 398 Predicate predicate = builder.predicate(); 399 OrderByList orders = builder.orders(); 400 401 // create MongoDB filter 402 Bson mgFilter = createFilter(predicate); 403 404 // create MongoDB order 405 Bson mgOrder = createSort(orders); 406 407 logRequest(mgFilter, mgOrder); 408 MongoCursor<Document> cursor = collection.find(mgFilter).sort(mgOrder).batchSize(batchSize).iterator(); 409 String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds); 410 return scroll(scrollId); 411 } 412 413 @Override 414 public ScrollResult<String> scroll(String scrollId) { 415 return cursorService.scroll(scrollId); 416 } 417 418 public class MongoDBLogEntryProvider implements BaseLogEntryProvider { 419 420 @Override 421 public int removeEntries(String eventId, String pathPattern) { 422 throw new UnsupportedOperationException("Not implemented yet!"); 423 } 424 425 @Override 426 public void addLogEntry(LogEntry logEntry) { 427 List<LogEntry> entries = new ArrayList<>(); 428 entries.add(logEntry); 429 addLogEntries(entries); 430 } 431 432 } 433 434}