001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.audit.impl; 020 021import static org.nuxeo.ecm.platform.audit.listener.StreamAuditEventListener.STREAM_NAME; 022 023import java.io.IOException; 024import java.io.UnsupportedEncodingException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.ecm.core.api.NuxeoException; 033import org.nuxeo.ecm.platform.audit.api.AuditLogger; 034import org.nuxeo.ecm.platform.audit.api.LogEntry; 035import org.nuxeo.lib.stream.computation.AbstractComputation; 036import org.nuxeo.lib.stream.computation.ComputationContext; 037import org.nuxeo.lib.stream.computation.Record; 038import org.nuxeo.lib.stream.computation.Topology; 039import org.nuxeo.runtime.api.Framework; 040import org.nuxeo.runtime.stream.StreamProcessorTopology; 041 042import com.fasterxml.jackson.databind.ObjectMapper; 043 044/** 045 * Computation that consumes a stream of log entries and write them to the audit backend. 046 * 047 * @since 9.3 048 */ 049public class StreamAuditWriter implements StreamProcessorTopology { 050 private static final Log log = LogFactory.getLog(StreamAuditWriter.class); 051 052 public static final String COMPUTATION_NAME = "AuditLogWriter"; 053 054 public static final String BATCH_SIZE_OPT = "batchSize"; 055 056 public static final String BATCH_THRESHOLD_MS_OPT = "batchThresholdMs"; 057 058 public static final int DEFAULT_BATCH_SIZE = 10; 059 060 public static final int DEFAULT_BATCH_THRESHOLD_MS = 200; 061 062 @Override 063 public Topology getTopology(Map<String, String> options) { 064 int batchSize = getOptionAsInteger(options, BATCH_SIZE_OPT, DEFAULT_BATCH_SIZE); 065 int batchThresholdMs = getOptionAsInteger(options, BATCH_THRESHOLD_MS_OPT, DEFAULT_BATCH_THRESHOLD_MS); 066 return Topology.builder() 067 .addComputation( 068 () -> new AuditLogWriterComputation(COMPUTATION_NAME, batchSize, batchThresholdMs), 069 Collections.singletonList("i1:" + STREAM_NAME)) 070 .build(); 071 } 072 073 public static class AuditLogWriterComputation extends AbstractComputation { 074 protected final int batchSize; 075 076 protected final int batchThresholdMs; 077 078 protected final List<LogEntry> logEntries; 079 080 public AuditLogWriterComputation(String name, int batchSize, int batchThresholdMs) { 081 super(name, 1, 0); 082 this.batchSize = batchSize; 083 this.batchThresholdMs = batchThresholdMs; 084 logEntries = new ArrayList<>(batchSize); 085 } 086 087 @Override 088 public void init(ComputationContext context) { 089 log.debug(String.format("Starting computation: %s reading on: %s, batch size: %d, threshold: %dms", 090 COMPUTATION_NAME, STREAM_NAME, batchSize, batchThresholdMs)); 091 context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs); 092 } 093 094 @Override 095 public void processTimer(ComputationContext context, String key, long timestamp) { 096 writeEntriesToAudit(context); 097 context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs); 098 } 099 100 @Override 101 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 102 try { 103 logEntries.add(getLogEntryFromJson(record.data)); 104 } catch (NuxeoException e) { 105 log.error("Discard invalid record: " + record, e); 106 return; 107 } 108 if (logEntries.size() >= batchSize) { 109 writeEntriesToAudit(context); 110 } 111 } 112 113 @Override 114 public void destroy() { 115 log.debug( 116 String.format("Destroy computation: %s, pending entries: %d", COMPUTATION_NAME, logEntries.size())); 117 } 118 119 protected void writeEntriesToAudit(ComputationContext context) { 120 if (logEntries.isEmpty()) { 121 return; 122 } 123 if (log.isDebugEnabled()) { 124 log.debug(String.format("Writing %d log entries to audit backend.", logEntries.size())); 125 } 126 AuditLogger logger = Framework.getService(AuditLogger.class); 127 logger.addLogEntries(logEntries); 128 logEntries.clear(); 129 context.askForCheckpoint(); 130 } 131 132 protected LogEntry getLogEntryFromJson(byte[] data) { 133 String json = ""; 134 try { 135 json = new String(data, "UTF-8"); 136 ObjectMapper mapper = new ObjectMapper(); 137 return mapper.readValue(json, LogEntryImpl.class); 138 } catch (UnsupportedEncodingException e) { 139 throw new NuxeoException("Discard log entry, invalid byte array", e); 140 } catch (IOException e) { 141 throw new NuxeoException("Invalid json logEntry" + json, e); 142 } 143 } 144 } 145 146 protected int getOptionAsInteger(Map<String, String> options, String option, int defaultValue) { 147 String value = options.get(option); 148 return value == null ? defaultValue : Integer.valueOf(value); 149 } 150 151}