001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.mongodb.audit;
020
021import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_CATEGORY;
022import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_PATH;
023import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_UUID;
024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_DATE;
025import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_ID;
026import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
027import static org.nuxeo.runtime.mongodb.MongoDBSerializationHelper.MONGODB_ID;
028
029import java.io.IOException;
030import java.io.Serializable;
031import java.text.SimpleDateFormat;
032import java.util.ArrayList;
033import java.util.Calendar;
034import java.util.Date;
035import java.util.HashMap;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.TimeZone;
040import java.util.function.Function;
041import java.util.regex.Pattern;
042import java.util.stream.Collectors;
043import java.util.stream.StreamSupport;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.bson.Document;
048import org.bson.conversions.Bson;
049import org.nuxeo.common.utils.TextTemplate;
050import org.nuxeo.ecm.core.api.CursorService;
051import org.nuxeo.ecm.core.api.NuxeoException;
052import org.nuxeo.ecm.core.api.ScrollResult;
053import org.nuxeo.ecm.core.query.sql.model.Literals;
054import org.nuxeo.ecm.core.query.sql.model.MultiExpression;
055import org.nuxeo.ecm.core.query.sql.model.Operand;
056import org.nuxeo.ecm.core.query.sql.model.Operator;
057import org.nuxeo.ecm.core.query.sql.model.OrderByExpr;
058import org.nuxeo.ecm.core.query.sql.model.OrderByList;
059import org.nuxeo.ecm.core.query.sql.model.Predicate;
060import org.nuxeo.ecm.core.query.sql.model.Reference;
061import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
062import org.nuxeo.ecm.core.uidgen.UIDSequencer;
063import org.nuxeo.ecm.platform.audit.api.AuditQueryBuilder;
064import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
065import org.nuxeo.ecm.platform.audit.api.LogEntry;
066import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
067import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
068import org.nuxeo.ecm.platform.audit.service.AuditBackend;
069import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
070import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
071import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
072import org.nuxeo.runtime.api.Framework;
073import org.nuxeo.runtime.model.DefaultComponent;
074import org.nuxeo.runtime.mongodb.MongoDBComponent;
075import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
076import org.nuxeo.runtime.services.config.ConfigurationService;
077
078import com.fasterxml.jackson.databind.ObjectMapper;
079import com.mongodb.client.FindIterable;
080import com.mongodb.client.MongoCollection;
081import com.mongodb.client.MongoCursor;
082import com.mongodb.client.MongoDatabase;
083import com.mongodb.client.model.Filters;
084import com.mongodb.client.model.Indexes;
085import com.mongodb.client.model.Sorts;
086import com.mongodb.util.JSON;
087
088/**
089 * Implementation of the {@link AuditBackend} interface using MongoDB persistence.
090 *
091 * @since 9.1
092 */
093public class MongoDBAuditBackend extends AbstractAuditBackend implements AuditBackend {
094
095    private static final Log log = LogFactory.getLog(MongoDBAuditBackend.class);
096
097    public static final String AUDIT_DATABASE_ID = "audit";
098
099    public static final String COLLECTION_NAME_PROPERTY = "nuxeo.mongodb.audit.collection.name";
100
101    public static final String DEFAULT_COLLECTION_NAME = "audit";
102
103    public static final String SEQ_NAME = "audit";
104
105    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
106
107    protected MongoCollection<Document> collection;
108
109    protected MongoDBLogEntryProvider provider = new MongoDBLogEntryProvider();
110
111    protected CursorService<MongoCursor<Document>, Document, String> cursorService;
112
113    public MongoDBAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
114        super(component, config);
115    }
116
117    /**
118     * @since 9.3
119     */
120    public MongoDBAuditBackend() {
121        super();
122    }
123
124    @Override
125    public int getApplicationStartedOrder() {
126        DefaultComponent component = (DefaultComponent) Framework.getRuntime().getComponent(MongoDBComponent.NAME);
127        return component.getApplicationStartedOrder() + 1;
128    }
129
130    @Override
131    public void onApplicationStarted() {
132        log.info("Activate MongoDB backend for Audit");
133        // First retrieve the collection name
134        ConfigurationService configurationService = Framework.getService(ConfigurationService.class);
135        String collName = configurationService.getProperty(COLLECTION_NAME_PROPERTY, DEFAULT_COLLECTION_NAME);
136        // Get a connection to MongoDB
137        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
138        MongoDatabase database = mongoService.getDatabase(AUDIT_DATABASE_ID);
139        collection = database.getCollection(collName);
140        collection.createIndex(Indexes.ascending(LOG_DOC_UUID)); // query by doc id
141        collection.createIndex(Indexes.ascending(LOG_EVENT_DATE)); // query by date range
142        collection.createIndex(Indexes.ascending(LOG_EVENT_ID)); // query by type of event
143        collection.createIndex(Indexes.ascending(LOG_DOC_PATH)); // query by path
144        cursorService = new CursorService<>(doc -> {
145            Object id = doc.remove(MONGODB_ID);
146            if (id != null) {
147                doc.put(LOG_ID, id);
148            }
149            return JSON.serialize(doc);
150        });
151    }
152
153    @Override
154    public void onApplicationStopped() {
155        collection = null;
156        cursorService.clear();
157        cursorService = null;
158    }
159
160    /**
161     * @return the {@link MongoCollection} configured with audit settings.
162     */
163    public MongoCollection<Document> getAuditCollection() {
164        return collection;
165    }
166
167    @Override
168    public List<LogEntry> queryLogs(AuditQueryBuilder builder) {
169        // prepare parameters
170        Predicate predicate = builder.predicate();
171        OrderByList orders = builder.orders();
172        long offset = builder.offset();
173        long limit = builder.limit();
174
175        // create MongoDB filter
176        Bson mgFilter = createFilter(predicate);
177
178        // create MongoDB order
179        Bson mgOrder = createSort(orders);
180
181        logRequest(mgFilter, mgOrder);
182        FindIterable<Document> iterable = collection.find(mgFilter).sort(mgOrder).skip((int) offset).limit((int) limit);
183        return buildLogEntries(iterable);
184    }
185
186    protected Bson createFilter(Predicate andPredicate) {
187        // cast parameters
188        // current implementation only support a MultiExpression with AND operator
189        @SuppressWarnings("unchecked")
190        List<Predicate> predicates = (List<Predicate>) ((List<?>) ((MultiExpression) andPredicate).values);
191        // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right
192        Function<Operand, String> getFieldName = operand -> ((Reference) operand).name;
193        getFieldName = getFieldName.andThen(this::getMongoDBKey);
194
195        List<Bson> filterList = new ArrayList<>(predicates.size());
196        for (Predicate predicate : predicates) {
197            String leftName = getFieldName.apply(predicate.lvalue);
198            Operator operator = predicate.operator;
199            Object rightValue = Literals.valueOf(predicate.rvalue);
200            if (Operator.EQ.equals(operator)) {
201                filterList.add(Filters.eq(leftName, rightValue));
202            } else if (Operator.NOTEQ.equals(operator)) {
203                filterList.add(Filters.ne(leftName, rightValue));
204            } else if (Operator.LT.equals(operator)) {
205                filterList.add(Filters.lt(leftName, predicate.rvalue));
206            } else if (Operator.LTEQ.equals(operator)) {
207                filterList.add(Filters.lte(leftName, rightValue));
208            } else if (Operator.GTEQ.equals(operator)) {
209                filterList.add(Filters.gte(leftName, rightValue));
210            } else if (Operator.GT.equals(operator)) {
211                filterList.add(Filters.gt(leftName, rightValue));
212            } else if (Operator.IN.equals(operator)) {
213                filterList.add(Filters.in(leftName, (List<?>) rightValue));
214            } else if (Operator.STARTSWITH.equals(operator)) {
215                filterList.add(Filters.regex(leftName, "^" + Pattern.quote(String.valueOf(rightValue))));
216            }
217        }
218        return Filters.and(filterList);
219    }
220
221    protected Bson createSort(OrderByList orders) {
222        List<Bson> orderList = new ArrayList<>(orders.size());
223        for (OrderByExpr order : orders) {
224            String name = getMongoDBKey(order.reference.name);
225            if (order.isDescending) {
226                orderList.add(Sorts.descending(name));
227            } else {
228                orderList.add(Sorts.ascending(name));
229            }
230        }
231        return Sorts.orderBy(orderList);
232    }
233
234    protected String getMongoDBKey(String key) {
235        if (LOG_ID.equals(key)) {
236            return MONGODB_ID;
237        }
238        return key;
239    }
240
241    @Override
242    public LogEntry getLogEntryByID(long id) {
243        Document document = collection.find(Filters.eq(MONGODB_ID, Long.valueOf(id))).first();
244        if (document == null) {
245            return null;
246        }
247        return MongoDBAuditEntryReader.read(document);
248    }
249
250    @Override
251    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
252        Bson filter = buildFilter(query, params);
253        logRequest(filter, pageNb, pageSize);
254        FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize);
255        return buildLogEntries(iterable);
256    }
257
258    public Bson buildFilter(String query, Map<String, Object> params) {
259        if (params != null && params.size() > 0) {
260            query = expandQueryVariables(query, params);
261        }
262        return Document.parse(query);
263    }
264
265    public String expandQueryVariables(String query, Object[] params) {
266        Map<String, Object> qParams = new HashMap<>();
267        for (int i = 0; i < params.length; i++) {
268            query = query.replaceFirst("\\?", "\\${param" + i + "}");
269            qParams.put("param" + i, params[i]);
270        }
271        return expandQueryVariables(query, qParams);
272    }
273
274    public String expandQueryVariables(String query, Map<String, Object> params) {
275        if (params != null && params.size() > 0) {
276            TextTemplate tmpl = new TextTemplate();
277            // MongoDB date formatter - copied from org.bson.json.JsonWriter
278            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\'");
279            dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
280            for (Entry<String, Object> entry : params.entrySet()) {
281                String key = entry.getKey();
282                Object value = entry.getValue();
283                if (value instanceof Calendar) {
284                    tmpl.setVariable(key, dateFormat.format(((Calendar) value).getTime()));
285                } else if (value instanceof Date) {
286                    tmpl.setVariable(key, dateFormat.format(value));
287                } else if (value != null) {
288                    tmpl.setVariable(key, value.toString());
289                }
290            }
291            query = tmpl.processText(query);
292        }
293        return query;
294    }
295
296    @Override
297    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
298            int pageSize) {
299        List<Bson> list = new ArrayList<>();
300        if (eventIds != null && eventIds.length > 0) {
301            if (eventIds.length == 1) {
302                list.add(Filters.eq(LOG_EVENT_ID, eventIds[0]));
303            } else {
304                list.add(Filters.in(LOG_EVENT_ID, eventIds));
305            }
306        }
307        if (categories != null && categories.length > 0) {
308            if (categories.length == 1) {
309                list.add(Filters.eq(LOG_CATEGORY, categories[0]));
310            } else {
311                list.add(Filters.in(LOG_CATEGORY, categories));
312            }
313        }
314        if (path != null) {
315            list.add(Filters.eq(LOG_DOC_PATH, path));
316        }
317        if (limit != null) {
318            list.add(Filters.lt(LOG_EVENT_DATE, limit));
319        }
320        Bson filter = list.size() == 1 ? list.get(0) : Filters.and(list);
321        logRequest(filter, pageNb, pageSize);
322        FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize);
323        return buildLogEntries(iterable);
324    }
325
326    @Override
327    public void addLogEntries(List<LogEntry> entries) {
328        if (entries.isEmpty()) {
329            return;
330        }
331
332        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
333        UIDSequencer seq = uidGeneratorService.getSequencer();
334
335        List<Document> documents = new ArrayList<>(entries.size());
336        for (LogEntry entry : entries) {
337            entry.setId(seq.getNextLong(SEQ_NAME));
338            if (log.isDebugEnabled()) {
339                log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ",
340                        Long.valueOf(entry.getId()), entry.getLogDate(), entry.getDocUUID()));
341            }
342            documents.add(MongoDBAuditEntryWriter.asDocument(entry));
343        }
344        collection.insertMany(documents);
345    }
346
347    @Override
348    public Long getEventsCount(String eventId) {
349        return Long.valueOf(collection.count(Filters.eq("eventId", eventId)));
350    }
351
352    @Override
353    public long syncLogCreationEntries(String repoId, String path, Boolean recurs) {
354        return syncLogCreationEntries(provider, repoId, path, recurs);
355    }
356
357    @Override
358    public ExtendedInfo newExtendedInfo(Serializable value) {
359        return new MongoDBExtendedInfo(value);
360    }
361
362    private List<LogEntry> buildLogEntries(FindIterable<Document> iterable) {
363        return StreamSupport.stream(iterable.spliterator(), false)
364                            .map(MongoDBAuditEntryReader::read)
365                            .collect(Collectors.toList());
366    }
367
368    private void logRequest(Bson filter, Bson orderBy) {
369        if (log.isDebugEnabled()) {
370            log.debug("MongoDB: FILTER " + filter + (orderBy == null ? "" : " ORDER BY " + orderBy));
371        }
372    }
373
374    private void logRequest(Bson filter, int pageNb, int pageSize) {
375        if (log.isDebugEnabled()) {
376            log.debug("MongoDB: FILTER " + filter + " OFFSET " + pageNb + " LIMIT " + pageSize);
377        }
378    }
379
380    @Override
381    public void append(List<String> jsonEntries) {
382        // we need to parse json with jackson first because Document#parse from mongodb driver will parse number as int
383        List<Document> entries = new ArrayList<>();
384        for (String json : jsonEntries) {
385            try {
386                LogEntryImpl entry = OBJECT_MAPPER.readValue(json, LogEntryImpl.class);
387                if (entry.getId() == 0) {
388                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
389                }
390                Document doc = MongoDBAuditEntryWriter.asDocument(entry);
391                entries.add(doc);
392            } catch (IOException e) {
393                throw new NuxeoException("Unable to deserialize json entry=" + json, e);
394            }
395        }
396        collection.insertMany(entries);
397    }
398
399    @Override
400    public ScrollResult<String> scroll(AuditQueryBuilder builder, int batchSize, int keepAliveSeconds) {
401        // prepare parameters
402        Predicate predicate = builder.predicate();
403        OrderByList orders = builder.orders();
404
405        // create MongoDB filter
406        Bson mgFilter = createFilter(predicate);
407
408        // create MongoDB order
409        Bson mgOrder = createSort(orders);
410
411        logRequest(mgFilter, mgOrder);
412        MongoCursor<Document> cursor = collection.find(mgFilter).sort(mgOrder).batchSize(batchSize).iterator();
413        String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds);
414        return scroll(scrollId);
415    }
416
417    @Override
418    public ScrollResult<String> scroll(String scrollId) {
419        return cursorService.scroll(scrollId);
420    }
421
422    public class MongoDBLogEntryProvider implements BaseLogEntryProvider {
423
424        @Override
425        public int removeEntries(String eventId, String pathPattern) {
426            throw new UnsupportedOperationException("Not implemented yet!");
427        }
428
429        @Override
430        public void addLogEntry(LogEntry logEntry) {
431            List<LogEntry> entries = new ArrayList<>();
432            entries.add(logEntry);
433            addLogEntries(entries);
434        }
435
436    }
437
438}