001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.mongodb.audit;
020
021import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_CATEGORY;
022import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_PATH;
023import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_UUID;
024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_DATE;
025import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_ID;
026import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
027import static org.nuxeo.runtime.mongodb.MongoDBSerializationHelper.MONGODB_ID;
028
029import java.io.IOException;
030import java.io.Serializable;
031import java.text.SimpleDateFormat;
032import java.time.ZonedDateTime;
033import java.util.ArrayList;
034import java.util.Calendar;
035import java.util.Date;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.Map.Entry;
040import java.util.TimeZone;
041import java.util.function.Function;
042import java.util.regex.Pattern;
043import java.util.stream.Collectors;
044import java.util.stream.StreamSupport;
045
046import org.apache.commons.logging.Log;
047import org.apache.commons.logging.LogFactory;
048import org.bson.Document;
049import org.bson.conversions.Bson;
050import org.nuxeo.common.utils.TextTemplate;
051import org.nuxeo.ecm.core.api.CursorService;
052import org.nuxeo.ecm.core.api.NuxeoException;
053import org.nuxeo.ecm.core.api.ScrollResult;
054import org.nuxeo.ecm.core.query.sql.model.Literals;
055import org.nuxeo.ecm.core.query.sql.model.MultiExpression;
056import org.nuxeo.ecm.core.query.sql.model.Operand;
057import org.nuxeo.ecm.core.query.sql.model.Operator;
058import org.nuxeo.ecm.core.query.sql.model.OrderByExpr;
059import org.nuxeo.ecm.core.query.sql.model.OrderByList;
060import org.nuxeo.ecm.core.query.sql.model.Predicate;
061import org.nuxeo.ecm.core.query.sql.model.QueryBuilder;
062import org.nuxeo.ecm.core.query.sql.model.Reference;
063import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
064import org.nuxeo.ecm.core.uidgen.UIDSequencer;
065import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
066import org.nuxeo.ecm.platform.audit.api.LogEntry;
067import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
068import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
069import org.nuxeo.ecm.platform.audit.service.AuditBackend;
070import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
071import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
072import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
073import org.nuxeo.runtime.api.Framework;
074import org.nuxeo.runtime.model.DefaultComponent;
075import org.nuxeo.runtime.mongodb.MongoDBComponent;
076import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
077import org.nuxeo.runtime.services.config.ConfigurationService;
078
079import com.fasterxml.jackson.databind.ObjectMapper;
080import com.mongodb.client.FindIterable;
081import com.mongodb.client.MongoCollection;
082import com.mongodb.client.MongoCursor;
083import com.mongodb.client.MongoDatabase;
084import com.mongodb.client.model.Filters;
085import com.mongodb.client.model.Indexes;
086import com.mongodb.client.model.Sorts;
087
088/**
089 * Implementation of the {@link AuditBackend} interface using MongoDB persistence.
090 *
091 * @since 9.1
092 */
093public class MongoDBAuditBackend extends AbstractAuditBackend implements AuditBackend {
094
095    private static final Log log = LogFactory.getLog(MongoDBAuditBackend.class);
096
097    public static final String AUDIT_DATABASE_ID = "audit";
098
099    public static final String COLLECTION_NAME_PROPERTY = "nuxeo.mongodb.audit.collection.name";
100
101    public static final String DEFAULT_COLLECTION_NAME = "audit";
102
103    public static final String SEQ_NAME = "audit";
104
105    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
106
107    protected MongoCollection<Document> collection;
108
109    protected MongoDBLogEntryProvider provider = new MongoDBLogEntryProvider();
110
111    protected CursorService<MongoCursor<Document>, Document, String> cursorService;
112
113    public MongoDBAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
114        super(component, config);
115    }
116
117    /**
118     * @since 9.3
119     */
120    public MongoDBAuditBackend() {
121        super();
122    }
123
124    @Override
125    public int getApplicationStartedOrder() {
126        DefaultComponent component = (DefaultComponent) Framework.getRuntime()
127                                                                 .getComponent(MongoDBComponent.COMPONENT_NAME);
128        return component.getApplicationStartedOrder() + 1;
129    }
130
131    @Override
132    public void onApplicationStarted() {
133        log.info("Activate MongoDB backend for Audit");
134        // First retrieve the collection name
135        ConfigurationService configurationService = Framework.getService(ConfigurationService.class);
136        String collName = configurationService.getString(COLLECTION_NAME_PROPERTY, DEFAULT_COLLECTION_NAME);
137        // Get a connection to MongoDB
138        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
139        MongoDatabase database = mongoService.getDatabase(AUDIT_DATABASE_ID);
140        collection = database.getCollection(collName);
141        collection.createIndex(Indexes.ascending(LOG_DOC_UUID)); // query by doc id
142        collection.createIndex(Indexes.ascending(LOG_EVENT_DATE)); // query by date range
143        collection.createIndex(Indexes.ascending(LOG_EVENT_ID)); // query by type of event
144        collection.createIndex(Indexes.ascending(LOG_DOC_PATH)); // query by path
145        cursorService = new CursorService<>(doc -> {
146            Object id = doc.remove(MONGODB_ID);
147            if (id != null) {
148                doc.put(LOG_ID, id);
149            }
150            return doc.toJson();
151        });
152    }
153
154    @Override
155    public void onApplicationStopped() {
156        collection = null;
157        cursorService.clear();
158        cursorService = null;
159    }
160
161    /**
162     * @return the {@link MongoCollection} configured with audit settings.
163     */
164    public MongoCollection<Document> getAuditCollection() {
165        return collection;
166    }
167
168    @Override
169    public List<LogEntry> queryLogs(QueryBuilder builder) {
170        // prepare parameters
171        MultiExpression predicate = builder.predicate();
172        OrderByList orders = builder.orders();
173        long offset = builder.offset();
174        long limit = builder.limit();
175
176        // create MongoDB filter
177        Bson mgFilter = createFilter(predicate);
178
179        // create MongoDB order
180        Bson mgOrder = createSort(orders);
181
182        logRequest(mgFilter, mgOrder);
183        FindIterable<Document> iterable = collection.find(mgFilter).sort(mgOrder).skip((int) offset).limit((int) limit);
184        return buildLogEntries(iterable);
185    }
186
187    protected Bson createFilter(MultiExpression andPredicate) {
188        // cast parameters
189        // current implementation only support a MultiExpression with AND operator
190        List<Predicate> predicates = andPredicate.predicates;
191        // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right
192        Function<Operand, String> getFieldName = operand -> ((Reference) operand).name;
193        getFieldName = getFieldName.andThen(this::getMongoDBKey);
194
195        List<Bson> filterList = new ArrayList<>(predicates.size());
196        for (Predicate predicate : predicates) {
197            String leftName = getFieldName.apply(predicate.lvalue);
198            Operator operator = predicate.operator;
199            Object rightValue = Literals.valueOf(predicate.rvalue);
200            if (rightValue instanceof ZonedDateTime) {
201                // The ZonedDateTime representation is not compatible with MongoDB query
202                rightValue = Date.from(((ZonedDateTime) rightValue).toInstant());
203            }
204            if (Operator.EQ.equals(operator)) {
205                filterList.add(Filters.eq(leftName, rightValue));
206            } else if (Operator.NOTEQ.equals(operator)) {
207                filterList.add(Filters.ne(leftName, rightValue));
208            } else if (Operator.LT.equals(operator)) {
209                filterList.add(Filters.lt(leftName, rightValue));
210            } else if (Operator.LTEQ.equals(operator)) {
211                filterList.add(Filters.lte(leftName, rightValue));
212            } else if (Operator.GTEQ.equals(operator)) {
213                filterList.add(Filters.gte(leftName, rightValue));
214            } else if (Operator.GT.equals(operator)) {
215                filterList.add(Filters.gt(leftName, rightValue));
216            } else if (Operator.IN.equals(operator)) {
217                filterList.add(Filters.in(leftName, (List<?>) rightValue));
218            } else if (Operator.STARTSWITH.equals(operator)) {
219                filterList.add(Filters.regex(leftName, "^" + Pattern.quote(String.valueOf(rightValue))));
220            }
221        }
222        return Filters.and(filterList);
223    }
224
225    protected Bson createSort(OrderByList orders) {
226        List<Bson> orderList = new ArrayList<>(orders.size());
227        for (OrderByExpr order : orders) {
228            String name = getMongoDBKey(order.reference.name);
229            if (order.isDescending) {
230                orderList.add(Sorts.descending(name));
231            } else {
232                orderList.add(Sorts.ascending(name));
233            }
234        }
235        return Sorts.orderBy(orderList);
236    }
237
238    protected String getMongoDBKey(String key) {
239        if (LOG_ID.equals(key)) {
240            return MONGODB_ID;
241        }
242        return key;
243    }
244
245    @Override
246    public LogEntry getLogEntryByID(long id) {
247        Document document = collection.find(Filters.eq(MONGODB_ID, Long.valueOf(id))).first();
248        if (document == null) {
249            return null;
250        }
251        return MongoDBAuditEntryReader.read(document);
252    }
253
254    @Override
255    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
256        Bson filter = buildFilter(query, params);
257        logRequest(filter, pageNb, pageSize);
258        FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize);
259        return buildLogEntries(iterable);
260    }
261
262    public Bson buildFilter(String query, Map<String, Object> params) {
263        if (params != null && params.size() > 0) {
264            query = expandQueryVariables(query, params);
265        }
266        return Document.parse(query);
267    }
268
269    public String expandQueryVariables(String query, Object[] params) {
270        Map<String, Object> qParams = new HashMap<>();
271        for (int i = 0; i < params.length; i++) {
272            query = query.replaceFirst("\\?", "\\${param" + i + "}");
273            qParams.put("param" + i, params[i]);
274        }
275        return expandQueryVariables(query, qParams);
276    }
277
278    public String expandQueryVariables(String query, Map<String, Object> params) {
279        if (params != null && params.size() > 0) {
280            TextTemplate tmpl = new TextTemplate();
281            // MongoDB date formatter - copied from org.bson.json.JsonWriter
282            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\'");
283            dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
284            for (Entry<String, Object> entry : params.entrySet()) {
285                String key = entry.getKey();
286                Object value = entry.getValue();
287                if (value instanceof Calendar) {
288                    tmpl.setVariable(key, dateFormat.format(((Calendar) value).getTime()));
289                } else if (value instanceof Date) {
290                    tmpl.setVariable(key, dateFormat.format(value));
291                } else if (value != null) {
292                    tmpl.setVariable(key, value.toString());
293                }
294            }
295            query = tmpl.processText(query);
296        }
297        return query;
298    }
299
300    @Override
301    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
302            int pageSize) {
303        List<Bson> list = new ArrayList<>();
304        if (eventIds != null && eventIds.length > 0) {
305            if (eventIds.length == 1) {
306                list.add(Filters.eq(LOG_EVENT_ID, eventIds[0]));
307            } else {
308                list.add(Filters.in(LOG_EVENT_ID, eventIds));
309            }
310        }
311        if (categories != null && categories.length > 0) {
312            if (categories.length == 1) {
313                list.add(Filters.eq(LOG_CATEGORY, categories[0]));
314            } else {
315                list.add(Filters.in(LOG_CATEGORY, categories));
316            }
317        }
318        if (path != null) {
319            list.add(Filters.eq(LOG_DOC_PATH, path));
320        }
321        if (limit != null) {
322            list.add(Filters.lt(LOG_EVENT_DATE, limit));
323        }
324        Bson filter = list.size() == 1 ? list.get(0) : Filters.and(list);
325        logRequest(filter, pageNb, pageSize);
326        FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize);
327        return buildLogEntries(iterable);
328    }
329
330    @Override
331    public void addLogEntries(List<LogEntry> entries) {
332        if (entries.isEmpty()) {
333            return;
334        }
335
336        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
337        UIDSequencer seq = uidGeneratorService.getSequencer();
338
339        List<Document> documents = new ArrayList<>(entries.size());
340        List<Long> block = seq.getNextBlock(SEQ_NAME, entries.size());
341        for (int i = 0; i < entries.size(); i++) {
342            LogEntry entry = entries.get(i);
343            entry.setId(block.get(i));
344            if (log.isDebugEnabled()) {
345                log.debug(String.format("Indexing log entry Id: %s, with logDate : %s, for docUUID: %s ",
346                        Long.valueOf(entry.getId()), entry.getLogDate(), entry.getDocUUID()));
347            }
348            documents.add(MongoDBAuditEntryWriter.asDocument(entry));
349        }
350        collection.insertMany(documents);
351    }
352
353    @Override
354    public Long getEventsCount(String eventId) {
355        return Long.valueOf(collection.countDocuments(Filters.eq("eventId", eventId)));
356    }
357
358    @Override
359    public long syncLogCreationEntries(String repoId, String path, Boolean recurs) {
360        return syncLogCreationEntries(provider, repoId, path, recurs);
361    }
362
363    @Override
364    public ExtendedInfo newExtendedInfo(Serializable value) {
365        return new MongoDBExtendedInfo(value);
366    }
367
368    private List<LogEntry> buildLogEntries(FindIterable<Document> iterable) {
369        return StreamSupport.stream(iterable.spliterator(), false)
370                            .map(MongoDBAuditEntryReader::read)
371                            .collect(Collectors.toList());
372    }
373
374    private void logRequest(Bson filter, Bson orderBy) {
375        if (log.isDebugEnabled()) {
376            log.debug("MongoDB: FILTER " + filter + (orderBy == null ? "" : " ORDER BY " + orderBy));
377        }
378    }
379
380    private void logRequest(Bson filter, int pageNb, int pageSize) {
381        if (log.isDebugEnabled()) {
382            log.debug("MongoDB: FILTER " + filter + " OFFSET " + pageNb + " LIMIT " + pageSize);
383        }
384    }
385
386    @Override
387    public void append(List<String> jsonEntries) {
388        // we need to parse json with jackson first because Document#parse from mongodb driver will parse number as int
389        List<Document> entries = new ArrayList<>();
390        for (String json : jsonEntries) {
391            try {
392                LogEntryImpl entry = OBJECT_MAPPER.readValue(json, LogEntryImpl.class);
393                if (entry.getId() == 0) {
394                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
395                }
396                Document doc = MongoDBAuditEntryWriter.asDocument(entry);
397                entries.add(doc);
398            } catch (IOException e) {
399                throw new NuxeoException("Unable to deserialize json entry=" + json, e);
400            }
401        }
402        collection.insertMany(entries);
403    }
404
405    @SuppressWarnings("resource") // CursorResult is being registered, must not be closed
406    @Override
407    public ScrollResult<String> scroll(QueryBuilder builder, int batchSize, int keepAliveSeconds) {
408        // prepare parameters
409        MultiExpression predicate = builder.predicate();
410        OrderByList orders = builder.orders();
411
412        // create MongoDB filter
413        Bson mgFilter = createFilter(predicate);
414
415        // create MongoDB order
416        Bson mgOrder = createSort(orders);
417
418        logRequest(mgFilter, mgOrder);
419        MongoCursor<Document> cursor = collection.find(mgFilter).sort(mgOrder).batchSize(batchSize).iterator();
420        String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds);
421        return scroll(scrollId);
422    }
423
424    @Override
425    public ScrollResult<String> scroll(String scrollId) {
426        return cursorService.scroll(scrollId);
427    }
428
429    public class MongoDBLogEntryProvider implements BaseLogEntryProvider {
430
431        @Override
432        public int removeEntries(String eventId, String pathPattern) {
433            throw new UnsupportedOperationException("Not implemented yet!");
434        }
435
436        @Override
437        public void addLogEntry(LogEntry logEntry) {
438            List<LogEntry> entries = new ArrayList<>();
439            entries.add(logEntry);
440            addLogEntries(entries);
441        }
442
443    }
444
445}