001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.mongodb.audit; 020 021import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_CATEGORY; 022import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_PATH; 023import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_UUID; 024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_DATE; 025import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_ID; 026import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID; 027import static org.nuxeo.runtime.mongodb.MongoDBSerializationHelper.MONGODB_ID; 028 029import java.io.IOException; 030import java.io.Serializable; 031import java.text.SimpleDateFormat; 032import java.time.ZonedDateTime; 033import java.util.ArrayList; 034import java.util.Calendar; 035import java.util.Date; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.Map.Entry; 040import java.util.TimeZone; 041import java.util.function.Function; 042import java.util.regex.Pattern; 043import java.util.stream.Collectors; 044import java.util.stream.StreamSupport; 045 046import org.apache.commons.logging.Log; 047import org.apache.commons.logging.LogFactory; 048import org.bson.Document; 049import org.bson.conversions.Bson; 050import org.nuxeo.common.utils.TextTemplate; 051import org.nuxeo.ecm.core.api.CursorService; 052import org.nuxeo.ecm.core.api.NuxeoException; 053import org.nuxeo.ecm.core.api.ScrollResult; 054import org.nuxeo.ecm.core.query.sql.model.Literals; 055import org.nuxeo.ecm.core.query.sql.model.MultiExpression; 056import org.nuxeo.ecm.core.query.sql.model.Operand; 057import org.nuxeo.ecm.core.query.sql.model.Operator; 058import org.nuxeo.ecm.core.query.sql.model.OrderByExpr; 059import org.nuxeo.ecm.core.query.sql.model.OrderByList; 060import org.nuxeo.ecm.core.query.sql.model.Predicate; 061import org.nuxeo.ecm.core.query.sql.model.QueryBuilder; 062import org.nuxeo.ecm.core.query.sql.model.Reference; 063import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 064import org.nuxeo.ecm.core.uidgen.UIDSequencer; 065import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 066import org.nuxeo.ecm.platform.audit.api.LogEntry; 067import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 068import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 069import org.nuxeo.ecm.platform.audit.service.AuditBackend; 070import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 071import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService; 072import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor; 073import org.nuxeo.runtime.api.Framework; 074import org.nuxeo.runtime.model.DefaultComponent; 075import org.nuxeo.runtime.mongodb.MongoDBComponent; 076import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 077import org.nuxeo.runtime.services.config.ConfigurationService; 078 079import com.fasterxml.jackson.databind.ObjectMapper; 080import com.mongodb.client.FindIterable; 081import com.mongodb.client.MongoCollection; 082import com.mongodb.client.MongoCursor; 083import com.mongodb.client.MongoDatabase; 084import com.mongodb.client.model.Filters; 085import com.mongodb.client.model.Indexes; 086import com.mongodb.client.model.Sorts; 087 088/** 089 * Implementation of the {@link AuditBackend} interface using MongoDB persistence. 090 * 091 * @since 9.1 092 */ 093public class MongoDBAuditBackend extends AbstractAuditBackend implements AuditBackend { 094 095 private static final Log log = LogFactory.getLog(MongoDBAuditBackend.class); 096 097 public static final String AUDIT_DATABASE_ID = "audit"; 098 099 public static final String COLLECTION_NAME_PROPERTY = "nuxeo.mongodb.audit.collection.name"; 100 101 public static final String DEFAULT_COLLECTION_NAME = "audit"; 102 103 public static final String SEQ_NAME = "audit"; 104 105 public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); 106 107 protected MongoCollection<Document> collection; 108 109 protected MongoDBLogEntryProvider provider = new MongoDBLogEntryProvider(); 110 111 protected CursorService<MongoCursor<Document>, Document, String> cursorService; 112 113 public MongoDBAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) { 114 super(component, config); 115 } 116 117 /** 118 * @since 9.3 119 */ 120 public MongoDBAuditBackend() { 121 super(); 122 } 123 124 @Override 125 public int getApplicationStartedOrder() { 126 DefaultComponent component = (DefaultComponent) Framework.getRuntime() 127 .getComponent(MongoDBComponent.COMPONENT_NAME); 128 return component.getApplicationStartedOrder() + 1; 129 } 130 131 @Override 132 public void onApplicationStarted() { 133 log.info("Activate MongoDB backend for Audit"); 134 // First retrieve the collection name 135 ConfigurationService configurationService = Framework.getService(ConfigurationService.class); 136 String collName = configurationService.getString(COLLECTION_NAME_PROPERTY, DEFAULT_COLLECTION_NAME); 137 // Get a connection to MongoDB 138 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 139 MongoDatabase database = mongoService.getDatabase(AUDIT_DATABASE_ID); 140 collection = database.getCollection(collName); 141 collection.createIndex(Indexes.ascending(LOG_DOC_UUID)); // query by doc id 142 collection.createIndex(Indexes.ascending(LOG_EVENT_DATE)); // query by date range 143 collection.createIndex(Indexes.ascending(LOG_EVENT_ID)); // query by type of event 144 collection.createIndex(Indexes.ascending(LOG_DOC_PATH)); // query by path 145 cursorService = new CursorService<>(doc -> { 146 Object id = doc.remove(MONGODB_ID); 147 if (id != null) { 148 doc.put(LOG_ID, id); 149 } 150 return doc.toJson(); 151 }); 152 } 153 154 @Override 155 public void onApplicationStopped() { 156 collection = null; 157 cursorService.clear(); 158 cursorService = null; 159 } 160 161 /** 162 * @return the {@link MongoCollection} configured with audit settings. 163 */ 164 public MongoCollection<Document> getAuditCollection() { 165 return collection; 166 } 167 168 @Override 169 public List<LogEntry> queryLogs(QueryBuilder builder) { 170 // prepare parameters 171 MultiExpression predicate = builder.predicate(); 172 OrderByList orders = builder.orders(); 173 long offset = builder.offset(); 174 long limit = builder.limit(); 175 176 // create MongoDB filter 177 Bson mgFilter = createFilter(predicate); 178 179 // create MongoDB order 180 Bson mgOrder = createSort(orders); 181 182 logRequest(mgFilter, mgOrder); 183 FindIterable<Document> iterable = collection.find(mgFilter).sort(mgOrder).skip((int) offset).limit((int) limit); 184 return buildLogEntries(iterable); 185 } 186 187 protected Bson createFilter(MultiExpression andPredicate) { 188 // cast parameters 189 // current implementation only support a MultiExpression with AND operator 190 List<Predicate> predicates = andPredicate.predicates; 191 // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right 192 Function<Operand, String> getFieldName = operand -> ((Reference) operand).name; 193 getFieldName = getFieldName.andThen(this::getMongoDBKey); 194 195 List<Bson> filterList = new ArrayList<>(predicates.size()); 196 for (Predicate predicate : predicates) { 197 String leftName = getFieldName.apply(predicate.lvalue); 198 Operator operator = predicate.operator; 199 Object rightValue = Literals.valueOf(predicate.rvalue); 200 if (rightValue instanceof ZonedDateTime) { 201 // The ZonedDateTime representation is not compatible with MongoDB query 202 rightValue = Date.from(((ZonedDateTime) rightValue).toInstant()); 203 } 204 if (Operator.EQ.equals(operator)) { 205 filterList.add(Filters.eq(leftName, rightValue)); 206 } else if (Operator.NOTEQ.equals(operator)) { 207 filterList.add(Filters.ne(leftName, rightValue)); 208 } else if (Operator.LT.equals(operator)) { 209 filterList.add(Filters.lt(leftName, rightValue)); 210 } else if (Operator.LTEQ.equals(operator)) { 211 filterList.add(Filters.lte(leftName, rightValue)); 212 } else if (Operator.GTEQ.equals(operator)) { 213 filterList.add(Filters.gte(leftName, rightValue)); 214 } else if (Operator.GT.equals(operator)) { 215 filterList.add(Filters.gt(leftName, rightValue)); 216 } else if (Operator.IN.equals(operator)) { 217 filterList.add(Filters.in(leftName, (List<?>) rightValue)); 218 } else if (Operator.STARTSWITH.equals(operator)) { 219 filterList.add(Filters.regex(leftName, "^" + Pattern.quote(String.valueOf(rightValue)))); 220 } 221 } 222 return Filters.and(filterList); 223 } 224 225 protected Bson createSort(OrderByList orders) { 226 List<Bson> orderList = new ArrayList<>(orders.size()); 227 for (OrderByExpr order : orders) { 228 String name = getMongoDBKey(order.reference.name); 229 if (order.isDescending) { 230 orderList.add(Sorts.descending(name)); 231 } else { 232 orderList.add(Sorts.ascending(name)); 233 } 234 } 235 return Sorts.orderBy(orderList); 236 } 237 238 protected String getMongoDBKey(String key) { 239 if (LOG_ID.equals(key)) { 240 return MONGODB_ID; 241 } 242 return key; 243 } 244 245 @Override 246 public LogEntry getLogEntryByID(long id) { 247 Document document = collection.find(Filters.eq(MONGODB_ID, Long.valueOf(id))).first(); 248 if (document == null) { 249 return null; 250 } 251 return MongoDBAuditEntryReader.read(document); 252 } 253 254 @Override 255 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 256 Bson filter = buildFilter(query, params); 257 logRequest(filter, pageNb, pageSize); 258 FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize); 259 return buildLogEntries(iterable); 260 } 261 262 public Bson buildFilter(String query, Map<String, Object> params) { 263 if (params != null && params.size() > 0) { 264 query = expandQueryVariables(query, params); 265 } 266 return Document.parse(query); 267 } 268 269 public String expandQueryVariables(String query, Object[] params) { 270 Map<String, Object> qParams = new HashMap<>(); 271 for (int i = 0; i < params.length; i++) { 272 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 273 qParams.put("param" + i, params[i]); 274 } 275 return expandQueryVariables(query, qParams); 276 } 277 278 public String expandQueryVariables(String query, Map<String, Object> params) { 279 if (params != null && params.size() > 0) { 280 TextTemplate tmpl = new TextTemplate(); 281 // MongoDB date formatter - copied from org.bson.json.JsonWriter 282 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\'"); 283 dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); 284 for (Entry<String, Object> entry : params.entrySet()) { 285 String key = entry.getKey(); 286 Object value = entry.getValue(); 287 if (value instanceof Calendar) { 288 tmpl.setVariable(key, dateFormat.format(((Calendar) value).getTime())); 289 } else if (value instanceof Date) { 290 tmpl.setVariable(key, dateFormat.format(value)); 291 } else if (value != null) { 292 tmpl.setVariable(key, value.toString()); 293 } 294 } 295 query = tmpl.processText(query); 296 } 297 return query; 298 } 299 300 @Override 301 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 302 int pageSize) { 303 List<Bson> list = new ArrayList<>(); 304 if (eventIds != null && eventIds.length > 0) { 305 if (eventIds.length == 1) { 306 list.add(Filters.eq(LOG_EVENT_ID, eventIds[0])); 307 } else { 308 list.add(Filters.in(LOG_EVENT_ID, eventIds)); 309 } 310 } 311 if (categories != null && categories.length > 0) { 312 if (categories.length == 1) { 313 list.add(Filters.eq(LOG_CATEGORY, categories[0])); 314 } else { 315 list.add(Filters.in(LOG_CATEGORY, categories)); 316 } 317 } 318 if (path != null) { 319 list.add(Filters.eq(LOG_DOC_PATH, path)); 320 } 321 if (limit != null) { 322 list.add(Filters.lt(LOG_EVENT_DATE, limit)); 323 } 324 Bson filter = list.size() == 1 ? list.get(0) : Filters.and(list); 325 logRequest(filter, pageNb, pageSize); 326 FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize); 327 return buildLogEntries(iterable); 328 } 329 330 @Override 331 public void addLogEntries(List<LogEntry> entries) { 332 if (entries.isEmpty()) { 333 return; 334 } 335 336 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 337 UIDSequencer seq = uidGeneratorService.getSequencer(); 338 339 List<Document> documents = new ArrayList<>(entries.size()); 340 List<Long> block = seq.getNextBlock(SEQ_NAME, entries.size()); 341 for (int i = 0; i < entries.size(); i++) { 342 LogEntry entry = entries.get(i); 343 entry.setId(block.get(i)); 344 if (log.isDebugEnabled()) { 345 log.debug(String.format("Indexing log entry Id: %s, with logDate : %s, for docUUID: %s ", 346 Long.valueOf(entry.getId()), entry.getLogDate(), entry.getDocUUID())); 347 } 348 documents.add(MongoDBAuditEntryWriter.asDocument(entry)); 349 } 350 collection.insertMany(documents); 351 } 352 353 @Override 354 public Long getEventsCount(String eventId) { 355 return Long.valueOf(collection.countDocuments(Filters.eq("eventId", eventId))); 356 } 357 358 @Override 359 public long syncLogCreationEntries(String repoId, String path, Boolean recurs) { 360 return syncLogCreationEntries(provider, repoId, path, recurs); 361 } 362 363 @Override 364 public ExtendedInfo newExtendedInfo(Serializable value) { 365 return new MongoDBExtendedInfo(value); 366 } 367 368 private List<LogEntry> buildLogEntries(FindIterable<Document> iterable) { 369 return StreamSupport.stream(iterable.spliterator(), false) 370 .map(MongoDBAuditEntryReader::read) 371 .collect(Collectors.toList()); 372 } 373 374 private void logRequest(Bson filter, Bson orderBy) { 375 if (log.isDebugEnabled()) { 376 log.debug("MongoDB: FILTER " + filter + (orderBy == null ? "" : " ORDER BY " + orderBy)); 377 } 378 } 379 380 private void logRequest(Bson filter, int pageNb, int pageSize) { 381 if (log.isDebugEnabled()) { 382 log.debug("MongoDB: FILTER " + filter + " OFFSET " + pageNb + " LIMIT " + pageSize); 383 } 384 } 385 386 @Override 387 public void append(List<String> jsonEntries) { 388 // we need to parse json with jackson first because Document#parse from mongodb driver will parse number as int 389 List<Document> entries = new ArrayList<>(); 390 for (String json : jsonEntries) { 391 try { 392 LogEntryImpl entry = OBJECT_MAPPER.readValue(json, LogEntryImpl.class); 393 if (entry.getId() == 0) { 394 throw new NuxeoException("A json entry has an empty id. entry=" + json); 395 } 396 Document doc = MongoDBAuditEntryWriter.asDocument(entry); 397 entries.add(doc); 398 } catch (IOException e) { 399 throw new NuxeoException("Unable to deserialize json entry=" + json, e); 400 } 401 } 402 collection.insertMany(entries); 403 } 404 405 @SuppressWarnings("resource") // CursorResult is being registered, must not be closed 406 @Override 407 public ScrollResult<String> scroll(QueryBuilder builder, int batchSize, int keepAliveSeconds) { 408 // prepare parameters 409 MultiExpression predicate = builder.predicate(); 410 OrderByList orders = builder.orders(); 411 412 // create MongoDB filter 413 Bson mgFilter = createFilter(predicate); 414 415 // create MongoDB order 416 Bson mgOrder = createSort(orders); 417 418 logRequest(mgFilter, mgOrder); 419 MongoCursor<Document> cursor = collection.find(mgFilter).sort(mgOrder).batchSize(batchSize).iterator(); 420 String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds); 421 return scroll(scrollId); 422 } 423 424 @Override 425 public ScrollResult<String> scroll(String scrollId) { 426 return cursorService.scroll(scrollId); 427 } 428 429 public class MongoDBLogEntryProvider implements BaseLogEntryProvider { 430 431 @Override 432 public int removeEntries(String eventId, String pathPattern) { 433 throw new UnsupportedOperationException("Not implemented yet!"); 434 } 435 436 @Override 437 public void addLogEntry(LogEntry logEntry) { 438 List<LogEntry> entries = new ArrayList<>(); 439 entries.add(logEntry); 440 addLogEntries(entries); 441 } 442 443 } 444 445}