001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.audit.impl;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static org.nuxeo.ecm.platform.audit.listener.StreamAuditEventListener.STREAM_NAME;
023
024import java.io.IOException;
025import java.io.UnsupportedEncodingException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.api.NuxeoException;
034import org.nuxeo.ecm.platform.audit.api.AuditLogger;
035import org.nuxeo.ecm.platform.audit.api.LogEntry;
036import org.nuxeo.lib.stream.computation.AbstractBatchComputation;
037import org.nuxeo.lib.stream.computation.ComputationContext;
038import org.nuxeo.lib.stream.computation.Record;
039import org.nuxeo.lib.stream.computation.Topology;
040import org.nuxeo.runtime.api.Framework;
041import org.nuxeo.runtime.stream.StreamProcessorTopology;
042
043import com.fasterxml.jackson.databind.ObjectMapper;
044
045/**
046 * Computation that consumes a stream of log entries and write them to the audit backend.
047 *
048 * @since 9.3
049 */
050public class StreamAuditWriter implements StreamProcessorTopology {
051    private static final Log log = LogFactory.getLog(StreamAuditWriter.class);
052
053    public static final String COMPUTATION_NAME = "AuditLogWriter";
054
055    @Override
056    public Topology getTopology(Map<String, String> options) {
057        return Topology.builder()
058                       .addComputation(
059                               () -> new AuditLogWriterComputation(COMPUTATION_NAME),
060                               Collections.singletonList("i1:" + STREAM_NAME))
061                       .build();
062    }
063
064    public static class AuditLogWriterComputation extends AbstractBatchComputation {
065
066        public AuditLogWriterComputation(String name) {
067            super(name, 1, 0);
068        }
069
070        @Override
071        public void batchProcess(ComputationContext context, String inputStreamName, List<Record> records) {
072            List<LogEntry> logEntries = new ArrayList<>(records.size());
073            for (Record record : records) {
074                try {
075                    logEntries.add(getLogEntryFromJson(record.getData()));
076                } catch (NuxeoException e) {
077                    log.error("Discard invalid record: " + record, e);
078                }
079            }
080            writeEntriesToAudit(logEntries);
081        }
082
083        @Override
084        public void batchFailure(ComputationContext context, String inputStreamName, List<Record> records) {
085            // error log already done by abstract
086        }
087
088        protected void writeEntriesToAudit(List<LogEntry> logEntries) {
089            if (logEntries.isEmpty()) {
090                return;
091            }
092            if (log.isDebugEnabled()) {
093                log.debug(String.format("Writing %d log entries to audit backend.", logEntries.size()));
094            }
095            AuditLogger logger = Framework.getService(AuditLogger.class);
096            logger.addLogEntries(logEntries);
097        }
098
099        protected LogEntry getLogEntryFromJson(byte[] data) {
100            String json = "";
101            try {
102                json = new String(data, UTF_8);
103                ObjectMapper mapper = new ObjectMapper();
104                return mapper.readValue(json, LogEntryImpl.class);
105            } catch (UnsupportedEncodingException e) {
106                throw new NuxeoException("Discard log entry, invalid byte array", e);
107            } catch (IOException e) {
108                throw new NuxeoException("Invalid json logEntry" + json, e);
109            }
110        }
111    }
112
113}