001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Estelle Giuly <egiuly@nuxeo.com>
018 */
019package org.nuxeo.audit.storage.stream;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static org.nuxeo.ecm.platform.audit.listener.StreamAuditEventListener.STREAM_NAME;
023
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.List;
027import java.util.Map;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.audit.storage.impl.DirectoryAuditStorage;
032import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
033import org.nuxeo.lib.stream.computation.AbstractComputation;
034import org.nuxeo.lib.stream.computation.ComputationContext;
035import org.nuxeo.lib.stream.computation.Record;
036import org.nuxeo.lib.stream.computation.Topology;
037import org.nuxeo.runtime.api.Framework;
038import org.nuxeo.runtime.stream.StreamProcessorTopology;
039
040/**
041 * Computation that consumes a stream of Json log entries and write them to the Directory Audit Storage.
042 *
043 * @since 9.10
044 */
045public class StreamAuditStorageWriter implements StreamProcessorTopology {
046    private static final Log log = LogFactory.getLog(StreamAuditStorageWriter.class);
047
048    public static final String COMPUTATION_NAME = "AuditStorageLogWriter";
049
050    public static final String BATCH_SIZE_OPT = "batchSize";
051
052    public static final String BATCH_THRESHOLD_MS_OPT = "batchThresholdMs";
053
054    public static final int DEFAULT_BATCH_SIZE = 10;
055
056    public static final int DEFAULT_BATCH_THRESHOLD_MS = 200;
057
058    @Override
059    public Topology getTopology(Map<String, String> options) {
060        int batchSize = getOptionAsInteger(options, BATCH_SIZE_OPT, DEFAULT_BATCH_SIZE);
061        int batchThresholdMs = getOptionAsInteger(options, BATCH_THRESHOLD_MS_OPT, DEFAULT_BATCH_THRESHOLD_MS);
062        return Topology.builder()
063                       .addComputation(() -> new AuditStorageLogWriterComputation(COMPUTATION_NAME, batchSize,
064                               batchThresholdMs), Collections.singletonList("i1:" + STREAM_NAME))
065                       .build();
066    }
067
068    public class AuditStorageLogWriterComputation extends AbstractComputation {
069        protected final int batchSize;
070
071        protected final int batchThresholdMs;
072
073        protected final List<String> jsonEntries;
074
075        public AuditStorageLogWriterComputation(String name, int batchSize, int batchThresholdMs) {
076            super(name, 1, 0);
077            this.batchSize = batchSize;
078            this.batchThresholdMs = batchThresholdMs;
079            jsonEntries = new ArrayList<>(batchSize);
080        }
081
082        @Override
083        public void init(ComputationContext context) {
084            log.debug(String.format("Starting computation: %s reading on: %s, batch size: %d, threshold: %dms",
085                    COMPUTATION_NAME, STREAM_NAME, batchSize, batchThresholdMs));
086            context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs);
087        }
088
089        @Override
090        public void processTimer(ComputationContext context, String key, long timestamp) {
091            writeJsonEntriesToAudit(context);
092            context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs);
093        }
094
095        @Override
096        public void processRecord(ComputationContext context, String inputStreamName, Record record) {
097            jsonEntries.add(new String(record.getData(), UTF_8));
098            if (jsonEntries.size() >= batchSize) {
099                writeJsonEntriesToAudit(context);
100            }
101        }
102
103        @Override
104        public void destroy() {
105            log.debug(String.format("Destroy computation: %s, pending entries: %d", COMPUTATION_NAME,
106                    jsonEntries.size()));
107        }
108
109        /**
110         * Store JSON entries in the Directory Audit Storage
111         */
112        protected void writeJsonEntriesToAudit(ComputationContext context) {
113            if (jsonEntries.isEmpty()) {
114                return;
115            }
116            if (log.isDebugEnabled()) {
117                log.debug(String.format("Writing %d log entries to the directory audit storage %s.", jsonEntries.size(),
118                        DirectoryAuditStorage.NAME));
119            }
120            NXAuditEventsService audit = (NXAuditEventsService) Framework.getRuntime()
121                                                                         .getComponent(NXAuditEventsService.NAME);
122            DirectoryAuditStorage storage = (DirectoryAuditStorage) audit.getAuditStorage(DirectoryAuditStorage.NAME);
123            storage.append(jsonEntries);
124            jsonEntries.clear();
125            context.askForCheckpoint();
126        }
127    }
128
129    protected int getOptionAsInteger(Map<String, String> options, String option, int defaultValue) {
130        String value = options.get(option);
131        return value == null ? defaultValue : Integer.parseInt(value);
132    }
133
134}