001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.mongodb.audit; 020 021import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_CATEGORY; 022import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_PATH; 023import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_UUID; 024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_DATE; 025import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_ID; 026import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID; 027import static org.nuxeo.runtime.mongodb.MongoDBSerializationHelper.MONGODB_ID; 028 029import java.io.IOException; 030import java.io.Serializable; 031import java.text.SimpleDateFormat; 032import java.util.ArrayList; 033import java.util.Calendar; 034import java.util.Date; 035import java.util.HashMap; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039import java.util.TimeZone; 040import java.util.function.Function; 041import java.util.regex.Pattern; 042import java.util.stream.Collectors; 043import java.util.stream.StreamSupport; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.bson.Document; 048import org.bson.conversions.Bson; 049import org.nuxeo.common.utils.TextTemplate; 050import org.nuxeo.ecm.core.api.CursorService; 051import org.nuxeo.ecm.core.api.NuxeoException; 052import org.nuxeo.ecm.core.api.ScrollResult; 053import org.nuxeo.ecm.core.query.sql.model.Literals; 054import org.nuxeo.ecm.core.query.sql.model.MultiExpression; 055import org.nuxeo.ecm.core.query.sql.model.Operand; 056import org.nuxeo.ecm.core.query.sql.model.Operator; 057import org.nuxeo.ecm.core.query.sql.model.OrderByExpr; 058import org.nuxeo.ecm.core.query.sql.model.OrderByList; 059import org.nuxeo.ecm.core.query.sql.model.Predicate; 060import org.nuxeo.ecm.core.query.sql.model.QueryBuilder; 061import org.nuxeo.ecm.core.query.sql.model.Reference; 062import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 063import org.nuxeo.ecm.core.uidgen.UIDSequencer; 064import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 065import org.nuxeo.ecm.platform.audit.api.LogEntry; 066import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 067import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 068import org.nuxeo.ecm.platform.audit.service.AuditBackend; 069import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 070import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService; 071import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor; 072import org.nuxeo.runtime.api.Framework; 073import org.nuxeo.runtime.model.DefaultComponent; 074import org.nuxeo.runtime.mongodb.MongoDBComponent; 075import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 076import org.nuxeo.runtime.services.config.ConfigurationService; 077 078import com.fasterxml.jackson.databind.ObjectMapper; 079import com.mongodb.client.FindIterable; 080import com.mongodb.client.MongoCollection; 081import com.mongodb.client.MongoCursor; 082import com.mongodb.client.MongoDatabase; 083import com.mongodb.client.model.Filters; 084import com.mongodb.client.model.Indexes; 085import com.mongodb.client.model.Sorts; 086import com.mongodb.util.JSON; 087 088/** 089 * Implementation of the {@link AuditBackend} interface using MongoDB persistence. 090 * 091 * @since 9.1 092 */ 093public class MongoDBAuditBackend extends AbstractAuditBackend implements AuditBackend { 094 095 private static final Log log = LogFactory.getLog(MongoDBAuditBackend.class); 096 097 public static final String AUDIT_DATABASE_ID = "audit"; 098 099 public static final String COLLECTION_NAME_PROPERTY = "nuxeo.mongodb.audit.collection.name"; 100 101 public static final String DEFAULT_COLLECTION_NAME = "audit"; 102 103 public static final String SEQ_NAME = "audit"; 104 105 public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); 106 107 protected MongoCollection<Document> collection; 108 109 protected MongoDBLogEntryProvider provider = new MongoDBLogEntryProvider(); 110 111 protected CursorService<MongoCursor<Document>, Document, String> cursorService; 112 113 public MongoDBAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) { 114 super(component, config); 115 } 116 117 /** 118 * @since 9.3 119 */ 120 public MongoDBAuditBackend() { 121 super(); 122 } 123 124 @Override 125 public int getApplicationStartedOrder() { 126 DefaultComponent component = (DefaultComponent) Framework.getRuntime() 127 .getComponent(MongoDBComponent.COMPONENT_NAME); 128 return component.getApplicationStartedOrder() + 1; 129 } 130 131 @Override 132 public void onApplicationStarted() { 133 log.info("Activate MongoDB backend for Audit"); 134 // First retrieve the collection name 135 ConfigurationService configurationService = Framework.getService(ConfigurationService.class); 136 String collName = configurationService.getProperty(COLLECTION_NAME_PROPERTY, DEFAULT_COLLECTION_NAME); 137 // Get a connection to MongoDB 138 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 139 MongoDatabase database = mongoService.getDatabase(AUDIT_DATABASE_ID); 140 collection = database.getCollection(collName); 141 collection.createIndex(Indexes.ascending(LOG_DOC_UUID)); // query by doc id 142 collection.createIndex(Indexes.ascending(LOG_EVENT_DATE)); // query by date range 143 collection.createIndex(Indexes.ascending(LOG_EVENT_ID)); // query by type of event 144 collection.createIndex(Indexes.ascending(LOG_DOC_PATH)); // query by path 145 cursorService = new CursorService<>(doc -> { 146 Object id = doc.remove(MONGODB_ID); 147 if (id != null) { 148 doc.put(LOG_ID, id); 149 } 150 return JSON.serialize(doc); 151 }); 152 } 153 154 @Override 155 public void onApplicationStopped() { 156 collection = null; 157 cursorService.clear(); 158 cursorService = null; 159 } 160 161 /** 162 * @return the {@link MongoCollection} configured with audit settings. 163 */ 164 public MongoCollection<Document> getAuditCollection() { 165 return collection; 166 } 167 168 @Override 169 public List<LogEntry> queryLogs(QueryBuilder builder) { 170 // prepare parameters 171 MultiExpression predicate = builder.predicate(); 172 OrderByList orders = builder.orders(); 173 long offset = builder.offset(); 174 long limit = builder.limit(); 175 176 // create MongoDB filter 177 Bson mgFilter = createFilter(predicate); 178 179 // create MongoDB order 180 Bson mgOrder = createSort(orders); 181 182 logRequest(mgFilter, mgOrder); 183 FindIterable<Document> iterable = collection.find(mgFilter).sort(mgOrder).skip((int) offset).limit((int) limit); 184 return buildLogEntries(iterable); 185 } 186 187 protected Bson createFilter(MultiExpression andPredicate) { 188 // cast parameters 189 // current implementation only support a MultiExpression with AND operator 190 List<Predicate> predicates = andPredicate.predicates; 191 // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right 192 Function<Operand, String> getFieldName = operand -> ((Reference) operand).name; 193 getFieldName = getFieldName.andThen(this::getMongoDBKey); 194 195 List<Bson> filterList = new ArrayList<>(predicates.size()); 196 for (Predicate predicate : predicates) { 197 String leftName = getFieldName.apply(predicate.lvalue); 198 Operator operator = predicate.operator; 199 Object rightValue = Literals.valueOf(predicate.rvalue); 200 if (Operator.EQ.equals(operator)) { 201 filterList.add(Filters.eq(leftName, rightValue)); 202 } else if (Operator.NOTEQ.equals(operator)) { 203 filterList.add(Filters.ne(leftName, rightValue)); 204 } else if (Operator.LT.equals(operator)) { 205 filterList.add(Filters.lt(leftName, predicate.rvalue)); 206 } else if (Operator.LTEQ.equals(operator)) { 207 filterList.add(Filters.lte(leftName, rightValue)); 208 } else if (Operator.GTEQ.equals(operator)) { 209 filterList.add(Filters.gte(leftName, rightValue)); 210 } else if (Operator.GT.equals(operator)) { 211 filterList.add(Filters.gt(leftName, rightValue)); 212 } else if (Operator.IN.equals(operator)) { 213 filterList.add(Filters.in(leftName, (List<?>) rightValue)); 214 } else if (Operator.STARTSWITH.equals(operator)) { 215 filterList.add(Filters.regex(leftName, "^" + Pattern.quote(String.valueOf(rightValue)))); 216 } 217 } 218 return Filters.and(filterList); 219 } 220 221 protected Bson createSort(OrderByList orders) { 222 List<Bson> orderList = new ArrayList<>(orders.size()); 223 for (OrderByExpr order : orders) { 224 String name = getMongoDBKey(order.reference.name); 225 if (order.isDescending) { 226 orderList.add(Sorts.descending(name)); 227 } else { 228 orderList.add(Sorts.ascending(name)); 229 } 230 } 231 return Sorts.orderBy(orderList); 232 } 233 234 protected String getMongoDBKey(String key) { 235 if (LOG_ID.equals(key)) { 236 return MONGODB_ID; 237 } 238 return key; 239 } 240 241 @Override 242 public LogEntry getLogEntryByID(long id) { 243 Document document = collection.find(Filters.eq(MONGODB_ID, Long.valueOf(id))).first(); 244 if (document == null) { 245 return null; 246 } 247 return MongoDBAuditEntryReader.read(document); 248 } 249 250 @Override 251 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 252 Bson filter = buildFilter(query, params); 253 logRequest(filter, pageNb, pageSize); 254 FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize); 255 return buildLogEntries(iterable); 256 } 257 258 public Bson buildFilter(String query, Map<String, Object> params) { 259 if (params != null && params.size() > 0) { 260 query = expandQueryVariables(query, params); 261 } 262 return Document.parse(query); 263 } 264 265 public String expandQueryVariables(String query, Object[] params) { 266 Map<String, Object> qParams = new HashMap<>(); 267 for (int i = 0; i < params.length; i++) { 268 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 269 qParams.put("param" + i, params[i]); 270 } 271 return expandQueryVariables(query, qParams); 272 } 273 274 public String expandQueryVariables(String query, Map<String, Object> params) { 275 if (params != null && params.size() > 0) { 276 TextTemplate tmpl = new TextTemplate(); 277 // MongoDB date formatter - copied from org.bson.json.JsonWriter 278 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\'"); 279 dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); 280 for (Entry<String, Object> entry : params.entrySet()) { 281 String key = entry.getKey(); 282 Object value = entry.getValue(); 283 if (value instanceof Calendar) { 284 tmpl.setVariable(key, dateFormat.format(((Calendar) value).getTime())); 285 } else if (value instanceof Date) { 286 tmpl.setVariable(key, dateFormat.format(value)); 287 } else if (value != null) { 288 tmpl.setVariable(key, value.toString()); 289 } 290 } 291 query = tmpl.processText(query); 292 } 293 return query; 294 } 295 296 @Override 297 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 298 int pageSize) { 299 List<Bson> list = new ArrayList<>(); 300 if (eventIds != null && eventIds.length > 0) { 301 if (eventIds.length == 1) { 302 list.add(Filters.eq(LOG_EVENT_ID, eventIds[0])); 303 } else { 304 list.add(Filters.in(LOG_EVENT_ID, eventIds)); 305 } 306 } 307 if (categories != null && categories.length > 0) { 308 if (categories.length == 1) { 309 list.add(Filters.eq(LOG_CATEGORY, categories[0])); 310 } else { 311 list.add(Filters.in(LOG_CATEGORY, categories)); 312 } 313 } 314 if (path != null) { 315 list.add(Filters.eq(LOG_DOC_PATH, path)); 316 } 317 if (limit != null) { 318 list.add(Filters.lt(LOG_EVENT_DATE, limit)); 319 } 320 Bson filter = list.size() == 1 ? list.get(0) : Filters.and(list); 321 logRequest(filter, pageNb, pageSize); 322 FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize); 323 return buildLogEntries(iterable); 324 } 325 326 @Override 327 public void addLogEntries(List<LogEntry> entries) { 328 if (entries.isEmpty()) { 329 return; 330 } 331 332 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 333 UIDSequencer seq = uidGeneratorService.getSequencer(); 334 335 List<Document> documents = new ArrayList<>(entries.size()); 336 List<Long> block = seq.getNextBlock(SEQ_NAME, entries.size()); 337 for (int i = 0; i < entries.size(); i++) { 338 LogEntry entry = entries.get(i); 339 entry.setId(block.get(i)); 340 if (log.isDebugEnabled()) { 341 log.debug(String.format("Indexing log entry Id: %s, with logDate : %s, for docUUID: %s ", 342 Long.valueOf(entry.getId()), entry.getLogDate(), entry.getDocUUID())); 343 } 344 documents.add(MongoDBAuditEntryWriter.asDocument(entry)); 345 } 346 collection.insertMany(documents); 347 } 348 349 @Override 350 public Long getEventsCount(String eventId) { 351 return Long.valueOf(collection.count(Filters.eq("eventId", eventId))); 352 } 353 354 @Override 355 public long syncLogCreationEntries(String repoId, String path, Boolean recurs) { 356 return syncLogCreationEntries(provider, repoId, path, recurs); 357 } 358 359 @Override 360 public ExtendedInfo newExtendedInfo(Serializable value) { 361 return new MongoDBExtendedInfo(value); 362 } 363 364 private List<LogEntry> buildLogEntries(FindIterable<Document> iterable) { 365 return StreamSupport.stream(iterable.spliterator(), false) 366 .map(MongoDBAuditEntryReader::read) 367 .collect(Collectors.toList()); 368 } 369 370 private void logRequest(Bson filter, Bson orderBy) { 371 if (log.isDebugEnabled()) { 372 log.debug("MongoDB: FILTER " + filter + (orderBy == null ? "" : " ORDER BY " + orderBy)); 373 } 374 } 375 376 private void logRequest(Bson filter, int pageNb, int pageSize) { 377 if (log.isDebugEnabled()) { 378 log.debug("MongoDB: FILTER " + filter + " OFFSET " + pageNb + " LIMIT " + pageSize); 379 } 380 } 381 382 @Override 383 public void append(List<String> jsonEntries) { 384 // we need to parse json with jackson first because Document#parse from mongodb driver will parse number as int 385 List<Document> entries = new ArrayList<>(); 386 for (String json : jsonEntries) { 387 try { 388 LogEntryImpl entry = OBJECT_MAPPER.readValue(json, LogEntryImpl.class); 389 if (entry.getId() == 0) { 390 throw new NuxeoException("A json entry has an empty id. entry=" + json); 391 } 392 Document doc = MongoDBAuditEntryWriter.asDocument(entry); 393 entries.add(doc); 394 } catch (IOException e) { 395 throw new NuxeoException("Unable to deserialize json entry=" + json, e); 396 } 397 } 398 collection.insertMany(entries); 399 } 400 401 @Override 402 public ScrollResult<String> scroll(QueryBuilder builder, int batchSize, int keepAliveSeconds) { 403 // prepare parameters 404 MultiExpression predicate = builder.predicate(); 405 OrderByList orders = builder.orders(); 406 407 // create MongoDB filter 408 Bson mgFilter = createFilter(predicate); 409 410 // create MongoDB order 411 Bson mgOrder = createSort(orders); 412 413 logRequest(mgFilter, mgOrder); 414 MongoCursor<Document> cursor = collection.find(mgFilter).sort(mgOrder).batchSize(batchSize).iterator(); 415 String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds); 416 return scroll(scrollId); 417 } 418 419 @Override 420 public ScrollResult<String> scroll(String scrollId) { 421 return cursorService.scroll(scrollId); 422 } 423 424 public class MongoDBLogEntryProvider implements BaseLogEntryProvider { 425 426 @Override 427 public int removeEntries(String eventId, String pathPattern) { 428 throw new UnsupportedOperationException("Not implemented yet!"); 429 } 430 431 @Override 432 public void addLogEntry(LogEntry logEntry) { 433 List<LogEntry> entries = new ArrayList<>(); 434 entries.add(logEntry); 435 addLogEntries(entries); 436 } 437 438 } 439 440}