001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.audit.impl;
020
021import static org.nuxeo.ecm.platform.audit.listener.StreamAuditEventListener.STREAM_NAME;
022
023import java.io.IOException;
024import java.io.UnsupportedEncodingException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.ecm.core.api.NuxeoException;
033import org.nuxeo.ecm.platform.audit.api.AuditLogger;
034import org.nuxeo.ecm.platform.audit.api.LogEntry;
035import org.nuxeo.lib.stream.computation.AbstractComputation;
036import org.nuxeo.lib.stream.computation.ComputationContext;
037import org.nuxeo.lib.stream.computation.Record;
038import org.nuxeo.lib.stream.computation.Topology;
039import org.nuxeo.runtime.api.Framework;
040import org.nuxeo.runtime.stream.StreamProcessorTopology;
041
042import com.fasterxml.jackson.databind.ObjectMapper;
043
044/**
045 * Computation that consumes a stream of log entries and write them to the audit backend.
046 *
047 * @since 9.3
048 */
049public class StreamAuditWriter implements StreamProcessorTopology {
050    private static final Log log = LogFactory.getLog(StreamAuditWriter.class);
051
052    public static final String COMPUTATION_NAME = "AuditLogWriter";
053
054    public static final String BATCH_SIZE_OPT = "batchSize";
055
056    public static final String BATCH_THRESHOLD_MS_OPT = "batchThresholdMs";
057
058    public static final int DEFAULT_BATCH_SIZE = 10;
059
060    public static final int DEFAULT_BATCH_THRESHOLD_MS = 200;
061
062    @Override
063    public Topology getTopology(Map<String, String> options) {
064        int batchSize = getOptionAsInteger(options, BATCH_SIZE_OPT, DEFAULT_BATCH_SIZE);
065        int batchThresholdMs = getOptionAsInteger(options, BATCH_THRESHOLD_MS_OPT, DEFAULT_BATCH_THRESHOLD_MS);
066        return Topology.builder()
067                       .addComputation(
068                               () -> new AuditLogWriterComputation(COMPUTATION_NAME, batchSize, batchThresholdMs),
069                               Collections.singletonList("i1:" + STREAM_NAME))
070                       .build();
071    }
072
073    public static class AuditLogWriterComputation extends AbstractComputation {
074        protected final int batchSize;
075
076        protected final int batchThresholdMs;
077
078        protected final List<LogEntry> logEntries;
079
080        public AuditLogWriterComputation(String name, int batchSize, int batchThresholdMs) {
081            super(name, 1, 0);
082            this.batchSize = batchSize;
083            this.batchThresholdMs = batchThresholdMs;
084            logEntries = new ArrayList<>(batchSize);
085        }
086
087        @Override
088        public void init(ComputationContext context) {
089            log.debug(String.format("Starting computation: %s reading on: %s, batch size: %d, threshold: %dms",
090                    COMPUTATION_NAME, STREAM_NAME, batchSize, batchThresholdMs));
091            context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs);
092        }
093
094        @Override
095        public void processTimer(ComputationContext context, String key, long timestamp) {
096            writeEntriesToAudit(context);
097            context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs);
098        }
099
100        @Override
101        public void processRecord(ComputationContext context, String inputStreamName, Record record) {
102            try {
103                logEntries.add(getLogEntryFromJson(record.data));
104            } catch (NuxeoException e) {
105                log.error("Discard invalid record: " + record, e);
106                return;
107            }
108            if (logEntries.size() >= batchSize) {
109                writeEntriesToAudit(context);
110            }
111        }
112
113        @Override
114        public void destroy() {
115            log.debug(
116                    String.format("Destroy computation: %s, pending entries: %d", COMPUTATION_NAME, logEntries.size()));
117        }
118
119        protected void writeEntriesToAudit(ComputationContext context) {
120            if (logEntries.isEmpty()) {
121                return;
122            }
123            if (log.isDebugEnabled()) {
124                log.debug(String.format("Writing %d log entries to audit backend.", logEntries.size()));
125            }
126            AuditLogger logger = Framework.getService(AuditLogger.class);
127            logger.addLogEntries(logEntries);
128            logEntries.clear();
129            context.askForCheckpoint();
130        }
131
132        protected LogEntry getLogEntryFromJson(byte[] data) {
133            String json = "";
134            try {
135                json = new String(data, "UTF-8");
136                ObjectMapper mapper = new ObjectMapper();
137                return mapper.readValue(json, LogEntryImpl.class);
138            } catch (UnsupportedEncodingException e) {
139                throw new NuxeoException("Discard log entry, invalid byte array", e);
140            } catch (IOException e) {
141                throw new NuxeoException("Invalid json logEntry" + json, e);
142            }
143        }
144    }
145
146    protected int getOptionAsInteger(Map<String, String> options, String option, int defaultValue) {
147        String value = options.get(option);
148        return value == null ? defaultValue : Integer.valueOf(value);
149    }
150
151}