001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.mongodb.audit;
020
021import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_CATEGORY;
022import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_PATH;
023import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_DOC_UUID;
024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_DATE;
025import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_EVENT_ID;
026import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
027import static org.nuxeo.runtime.mongodb.MongoDBSerializationHelper.MONGODB_ID;
028
029import java.io.IOException;
030import java.io.Serializable;
031import java.text.SimpleDateFormat;
032import java.util.ArrayList;
033import java.util.Calendar;
034import java.util.Date;
035import java.util.HashMap;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.TimeZone;
040import java.util.function.Function;
041import java.util.stream.Collectors;
042import java.util.stream.StreamSupport;
043
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.bson.Document;
047import org.bson.conversions.Bson;
048import org.nuxeo.common.utils.TextTemplate;
049import org.nuxeo.ecm.core.api.CursorService;
050import org.nuxeo.ecm.core.api.NuxeoException;
051import org.nuxeo.ecm.core.api.ScrollResult;
052import org.nuxeo.ecm.core.query.sql.model.Literals;
053import org.nuxeo.ecm.core.query.sql.model.MultiExpression;
054import org.nuxeo.ecm.core.query.sql.model.Operand;
055import org.nuxeo.ecm.core.query.sql.model.Operator;
056import org.nuxeo.ecm.core.query.sql.model.OrderByExpr;
057import org.nuxeo.ecm.core.query.sql.model.OrderByList;
058import org.nuxeo.ecm.core.query.sql.model.Predicate;
059import org.nuxeo.ecm.core.query.sql.model.Reference;
060import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
061import org.nuxeo.ecm.core.uidgen.UIDSequencer;
062import org.nuxeo.ecm.platform.audit.api.AuditQueryBuilder;
063import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
064import org.nuxeo.ecm.platform.audit.api.LogEntry;
065import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
066import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
067import org.nuxeo.ecm.platform.audit.service.AuditBackend;
068import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
069import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
070import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
071import org.nuxeo.runtime.api.Framework;
072import org.nuxeo.runtime.model.DefaultComponent;
073import org.nuxeo.runtime.mongodb.MongoDBComponent;
074import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
075import org.nuxeo.runtime.services.config.ConfigurationService;
076
077import com.fasterxml.jackson.databind.ObjectMapper;
078import com.mongodb.client.FindIterable;
079import com.mongodb.client.MongoCollection;
080import com.mongodb.client.MongoCursor;
081import com.mongodb.client.MongoDatabase;
082import com.mongodb.client.model.Filters;
083import com.mongodb.client.model.Indexes;
084import com.mongodb.client.model.Sorts;
085import com.mongodb.util.JSON;
086
087/**
088 * Implementation of the {@link AuditBackend} interface using MongoDB persistence.
089 *
090 * @since 9.1
091 */
092public class MongoDBAuditBackend extends AbstractAuditBackend implements AuditBackend {
093
094    private static final Log log = LogFactory.getLog(MongoDBAuditBackend.class);
095
096    public static final String AUDIT_DATABASE_ID = "audit";
097
098    public static final String COLLECTION_NAME_PROPERTY = "nuxeo.mongodb.audit.collection.name";
099
100    public static final String DEFAULT_COLLECTION_NAME = "audit";
101
102    public static final String SEQ_NAME = "audit";
103
104    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
105
106    protected MongoCollection<Document> collection;
107
108    protected MongoDBLogEntryProvider provider = new MongoDBLogEntryProvider();
109
110    protected CursorService<MongoCursor<Document>, Document, String> cursorService;
111
112    public MongoDBAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
113        super(component, config);
114    }
115
116    /**
117     * @since 9.3
118     */
119    public MongoDBAuditBackend() {
120        super();
121    }
122
123    @Override
124    public int getApplicationStartedOrder() {
125        DefaultComponent component = (DefaultComponent) Framework.getRuntime().getComponent(MongoDBComponent.NAME);
126        return component.getApplicationStartedOrder() + 1;
127    }
128
129    @Override
130    public void onApplicationStarted() {
131        log.info("Activate MongoDB backend for Audit");
132        // First retrieve the collection name
133        ConfigurationService configurationService = Framework.getService(ConfigurationService.class);
134        String collName = configurationService.getProperty(COLLECTION_NAME_PROPERTY, DEFAULT_COLLECTION_NAME);
135        // Get a connection to MongoDB
136        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
137        MongoDatabase database = mongoService.getDatabase(AUDIT_DATABASE_ID);
138        collection = database.getCollection(collName);
139        collection.createIndex(Indexes.ascending(LOG_DOC_UUID)); // query by doc id
140        collection.createIndex(Indexes.ascending(LOG_EVENT_DATE)); // query by date range
141        collection.createIndex(Indexes.ascending(LOG_EVENT_ID)); // query by type of event
142        cursorService = new CursorService<>(doc -> {
143            Object id = doc.remove(MONGODB_ID);
144            if (id != null) {
145                doc.put(LOG_ID, id);
146            }
147            return JSON.serialize(doc);
148        });
149    }
150
151    @Override
152    public void onApplicationStopped() {
153        collection = null;
154        cursorService.clear();
155        cursorService = null;
156    }
157
158    /**
159     * @return the {@link MongoCollection} configured with audit settings.
160     */
161    public MongoCollection<Document> getAuditCollection() {
162        return collection;
163    }
164
165    @Override
166    public List<LogEntry> queryLogs(AuditQueryBuilder builder) {
167        // prepare parameters
168        Predicate predicate = builder.predicate();
169        OrderByList orders = builder.orders();
170        long offset = builder.offset();
171        long limit = builder.limit();
172
173        // create MongoDB filter
174        Bson mgFilter = createFilter(predicate);
175
176        // create MongoDB order
177        Bson mgOrder = createSort(orders);
178
179        logRequest(mgFilter, mgOrder);
180        FindIterable<Document> iterable = collection.find(mgFilter).sort(mgOrder).skip((int) offset).limit((int) limit);
181        return buildLogEntries(iterable);
182    }
183
184    protected Bson createFilter(Predicate andPredicate) {
185        // cast parameters
186        // current implementation only support a MultiExpression with AND operator
187        @SuppressWarnings("unchecked")
188        List<Predicate> predicates = (List<Predicate>) ((List<?>) ((MultiExpression) andPredicate).values);
189        // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right
190        Function<Operand, String> getFieldName = operand -> ((Reference) operand).name;
191        getFieldName = getFieldName.andThen(this::getMongoDBKey);
192
193        List<Bson> filterList = new ArrayList<>(predicates.size());
194        for (Predicate predicate : predicates) {
195            String leftName = getFieldName.apply(predicate.lvalue);
196            Operator operator = predicate.operator;
197            Object rightValue = Literals.valueOf(predicate.rvalue);
198            if (Operator.EQ.equals(operator)) {
199                filterList.add(Filters.eq(leftName, rightValue));
200            } else if (Operator.NOTEQ.equals(operator)) {
201                filterList.add(Filters.ne(leftName, rightValue));
202            } else if (Operator.LT.equals(operator)) {
203                filterList.add(Filters.lt(leftName, predicate.rvalue));
204            } else if (Operator.LTEQ.equals(operator)) {
205                filterList.add(Filters.lte(leftName, rightValue));
206            } else if (Operator.GTEQ.equals(operator)) {
207                filterList.add(Filters.gte(leftName, rightValue));
208            } else if (Operator.GT.equals(operator)) {
209                filterList.add(Filters.gt(leftName, rightValue));
210            } else if (Operator.IN.equals(operator)) {
211                filterList.add(Filters.in(leftName, (List<?>) rightValue));
212            }
213        }
214        return Filters.and(filterList);
215    }
216
217    protected Bson createSort(OrderByList orders) {
218        List<Bson> orderList = new ArrayList<>(orders.size());
219        for (OrderByExpr order : orders) {
220            String name = getMongoDBKey(order.reference.name);
221            if (order.isDescending) {
222                orderList.add(Sorts.descending(name));
223            } else {
224                orderList.add(Sorts.ascending(name));
225            }
226        }
227        return Sorts.orderBy(orderList);
228    }
229
230    protected String getMongoDBKey(String key) {
231        if (LOG_ID.equals(key)) {
232            return MONGODB_ID;
233        }
234        return key;
235    }
236
237    @Override
238    public LogEntry getLogEntryByID(long id) {
239        Document document = collection.find(Filters.eq(MONGODB_ID, Long.valueOf(id))).first();
240        if (document == null) {
241            return null;
242        }
243        return MongoDBAuditEntryReader.read(document);
244    }
245
246    @Override
247    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
248        Bson filter = buildFilter(query, params);
249        logRequest(filter, pageNb, pageSize);
250        FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize);
251        return buildLogEntries(iterable);
252    }
253
254    public Bson buildFilter(String query, Map<String, Object> params) {
255        if (params != null && params.size() > 0) {
256            query = expandQueryVariables(query, params);
257        }
258        return Document.parse(query);
259    }
260
261    public String expandQueryVariables(String query, Object[] params) {
262        Map<String, Object> qParams = new HashMap<>();
263        for (int i = 0; i < params.length; i++) {
264            query = query.replaceFirst("\\?", "\\${param" + i + "}");
265            qParams.put("param" + i, params[i]);
266        }
267        return expandQueryVariables(query, qParams);
268    }
269
270    public String expandQueryVariables(String query, Map<String, Object> params) {
271        if (params != null && params.size() > 0) {
272            TextTemplate tmpl = new TextTemplate();
273            // MongoDB date formatter - copied from org.bson.json.JsonWriter
274            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\'");
275            dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
276            for (Entry<String, Object> entry : params.entrySet()) {
277                String key = entry.getKey();
278                Object value = entry.getValue();
279                if (value instanceof Calendar) {
280                    tmpl.setVariable(key, dateFormat.format(((Calendar) value).getTime()));
281                } else if (value instanceof Date) {
282                    tmpl.setVariable(key, dateFormat.format(value));
283                } else if (value != null) {
284                    tmpl.setVariable(key, value.toString());
285                }
286            }
287            query = tmpl.processText(query);
288        }
289        return query;
290    }
291
292    @Override
293    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
294            int pageSize) {
295        List<Bson> list = new ArrayList<>();
296        if (eventIds != null && eventIds.length > 0) {
297            if (eventIds.length == 1) {
298                list.add(Filters.eq(LOG_EVENT_ID, eventIds[0]));
299            } else {
300                list.add(Filters.in(LOG_EVENT_ID, eventIds));
301            }
302        }
303        if (categories != null && categories.length > 0) {
304            if (categories.length == 1) {
305                list.add(Filters.eq(LOG_CATEGORY, categories[0]));
306            } else {
307                list.add(Filters.in(LOG_CATEGORY, categories));
308            }
309        }
310        if (path != null) {
311            list.add(Filters.eq(LOG_DOC_PATH, path));
312        }
313        if (limit != null) {
314            list.add(Filters.lt(LOG_EVENT_DATE, limit));
315        }
316        Bson filter = list.size() == 1 ? list.get(0) : Filters.and(list);
317        logRequest(filter, pageNb, pageSize);
318        FindIterable<Document> iterable = collection.find(filter).skip(pageNb * pageSize).limit(pageSize);
319        return buildLogEntries(iterable);
320    }
321
322    @Override
323    public void addLogEntries(List<LogEntry> entries) {
324        if (entries.isEmpty()) {
325            return;
326        }
327
328        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
329        UIDSequencer seq = uidGeneratorService.getSequencer();
330
331        List<Document> documents = new ArrayList<>(entries.size());
332        for (LogEntry entry : entries) {
333            entry.setId(seq.getNextLong(SEQ_NAME));
334            if (log.isDebugEnabled()) {
335                log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ",
336                        Long.valueOf(entry.getId()), entry.getLogDate(), entry.getDocUUID()));
337            }
338            documents.add(MongoDBAuditEntryWriter.asDocument(entry));
339        }
340        collection.insertMany(documents);
341    }
342
343    @Override
344    public Long getEventsCount(String eventId) {
345        return Long.valueOf(collection.count(Filters.eq("eventId", eventId)));
346    }
347
348    @Override
349    public long syncLogCreationEntries(String repoId, String path, Boolean recurs) {
350        return syncLogCreationEntries(provider, repoId, path, recurs);
351    }
352
353    @Override
354    public ExtendedInfo newExtendedInfo(Serializable value) {
355        return new MongoDBExtendedInfo(value);
356    }
357
358    private List<LogEntry> buildLogEntries(FindIterable<Document> iterable) {
359        return StreamSupport.stream(iterable.spliterator(), false)
360                            .map(MongoDBAuditEntryReader::read)
361                            .collect(Collectors.toList());
362    }
363
364    private void logRequest(Bson filter, Bson orderBy) {
365        if (log.isDebugEnabled()) {
366            log.debug("MongoDB: FILTER " + filter + (orderBy == null ? "" : " ORDER BY " + orderBy));
367        }
368    }
369
370    private void logRequest(Bson filter, int pageNb, int pageSize) {
371        if (log.isDebugEnabled()) {
372            log.debug("MongoDB: FILTER " + filter + " OFFSET " + pageNb + " LIMIT " + pageSize);
373        }
374    }
375
376    @Override
377    public void append(List<String> jsonEntries) {
378        // we need to parse json with jackson first because Document#parse from mongodb driver will parse number as int
379        List<Document> entries = new ArrayList<>();
380        for (String json : jsonEntries) {
381            try {
382                LogEntryImpl entry = OBJECT_MAPPER.readValue(json, LogEntryImpl.class);
383                if (entry.getId() == 0) {
384                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
385                }
386                Document doc = MongoDBAuditEntryWriter.asDocument(entry);
387                entries.add(doc);
388            } catch (IOException e) {
389                throw new NuxeoException("Unable to deserialize json entry=" + json, e);
390            }
391        }
392        collection.insertMany(entries);
393    }
394
395    @Override
396    public ScrollResult<String> scroll(AuditQueryBuilder builder, int batchSize, int keepAliveSeconds) {
397        // prepare parameters
398        Predicate predicate = builder.predicate();
399        OrderByList orders = builder.orders();
400
401        // create MongoDB filter
402        Bson mgFilter = createFilter(predicate);
403
404        // create MongoDB order
405        Bson mgOrder = createSort(orders);
406
407        logRequest(mgFilter, mgOrder);
408        MongoCursor<Document> cursor = collection.find(mgFilter).sort(mgOrder).batchSize(batchSize).iterator();
409        String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds);
410        return scroll(scrollId);
411    }
412
413    @Override
414    public ScrollResult<String> scroll(String scrollId) {
415        return cursorService.scroll(scrollId);
416    }
417
418    public class MongoDBLogEntryProvider implements BaseLogEntryProvider {
419
420        @Override
421        public int removeEntries(String eventId, String pathPattern) {
422            throw new UnsupportedOperationException("Not implemented yet!");
423        }
424
425        @Override
426        public void addLogEntry(LogEntry logEntry) {
427            List<LogEntry> entries = new ArrayList<>();
428            entries.add(logEntry);
429            addLogEntries(entries);
430        }
431
432    }
433
434}