001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.audit.listener;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026
027import javax.naming.NamingException;
028import javax.transaction.RollbackException;
029import javax.transaction.Status;
030import javax.transaction.Synchronization;
031import javax.transaction.SystemException;
032import javax.transaction.TransactionManager;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.core.event.Event;
037import org.nuxeo.ecm.core.event.EventListener;
038import org.nuxeo.ecm.core.io.registry.MarshallerHelper;
039import org.nuxeo.ecm.core.io.registry.context.RenderingContext;
040import org.nuxeo.ecm.platform.audit.api.AuditLogger;
041import org.nuxeo.ecm.platform.audit.api.LogEntry;
042import org.nuxeo.lib.stream.computation.Record;
043import org.nuxeo.lib.stream.computation.Watermark;
044import org.nuxeo.lib.stream.log.LogAppender;
045import org.nuxeo.lib.stream.log.LogManager;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.stream.StreamService;
048import org.nuxeo.runtime.transaction.TransactionHelper;
049
050/**
051 * An events collector that write log entries as json record into a stream.
052 *
053 * @since 9.3
054 */
055public class StreamAuditEventListener implements EventListener, Synchronization {
056    private static final Log log = LogFactory.getLog(StreamAuditEventListener.class);
057
058    protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE);
059
060    protected static final ThreadLocal<List<LogEntry>> entries = ThreadLocal.withInitial(ArrayList::new);
061
062    public static final String STREAM_AUDIT_ENABLED_PROP = "nuxeo.stream.audit.enabled";
063
064    public static final String AUDIT_LOG_CONFIG_PROP = "nuxeo.stream.audit.log.config";
065
066    public static final String DEFAULT_LOG_CONFIG = "audit";
067
068    public static final String STREAM_NAME = "audit";
069
070    @Override
071    public void handleEvent(Event event) {
072        AuditLogger logger = Framework.getService(AuditLogger.class);
073        if (logger == null) {
074            return;
075        }
076        if (!isEnlisted.get()) {
077            isEnlisted.set(registerSynchronization(this));
078            entries.get().clear();
079            if (log.isDebugEnabled()) {
080                log.debug("AuditEventListener collecting entries for the tx");
081            }
082        }
083        if (logger.getAuditableEventNames().contains(event.getName())) {
084            entries.get().add(logger.buildEntryFromEvent(event));
085        }
086        if (!isEnlisted.get()) {
087            // there is no transaction so don't wait for a commit
088            afterCompletion(Status.STATUS_COMMITTED);
089        }
090
091    }
092
093    @Override
094    public void beforeCompletion() {
095        if (log.isDebugEnabled()) {
096            log.debug(String.format("AuditEventListener going to write %d entries.", entries.get().size()));
097        }
098    }
099
100    @Override
101    public void afterCompletion(int status) {
102        try {
103            if (entries.get().isEmpty()
104                    || (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status)) {
105                // This means that in case of rollback there is no event logged
106                return;
107            }
108            writeEntries();
109            if (log.isDebugEnabled()) {
110                log.debug(String.format("AuditEventListener writes %d entries.", entries.get().size()));
111            }
112        } finally {
113            isEnlisted.set(false);
114            entries.get().clear();
115        }
116    }
117
118    protected void writeEntries() {
119        if (entries.get().isEmpty()) {
120            return;
121        }
122        LogAppender<Record> appender = getLogManager().getAppender(STREAM_NAME);
123        entries.get().forEach(entry -> writeEntry(appender, entry));
124    }
125
126    protected void writeEntry(LogAppender<Record> appender, LogEntry entry) {
127        String json = asJson(entry);
128        if (json == null) {
129            return;
130        }
131        long timestamp = getTimestampForEntry(entry);
132        appender.append(0, new Record(String.valueOf(entry.getId()), json.getBytes(UTF_8),
133                Watermark.ofTimestamp(timestamp).getValue()));
134    }
135
136    protected long getTimestampForEntry(LogEntry entry) {
137        if (entry.getEventDate() != null) {
138            return entry.getEventDate().getTime();
139        }
140        return System.currentTimeMillis();
141    }
142
143    protected String asJson(LogEntry entry) {
144        if (entry == null) {
145            return null;
146        }
147        RenderingContext ctx = RenderingContext.CtxBuilder.get();
148        try {
149            return MarshallerHelper.objectToJson(entry, ctx);
150        } catch (IOException e) {
151            log.warn("Unable to translate entry into json, eventId:" + entry.getEventId() + ": " + e.getMessage(), e);
152            return null;
153        }
154    }
155
156    protected boolean registerSynchronization(Synchronization sync) {
157        try {
158            TransactionManager tm = TransactionHelper.lookupTransactionManager();
159            if (tm != null) {
160                if (tm.getTransaction() != null) {
161                    tm.getTransaction().registerSynchronization(sync);
162                    return true;
163                }
164                return false;
165            } else {
166                log.error("Unable to register synchronization : no TransactionManager");
167                return false;
168            }
169        } catch (NamingException | IllegalStateException | SystemException | RollbackException e) {
170            log.error("Unable to register synchronization", e);
171            return false;
172        }
173    }
174
175    protected LogManager getLogManager() {
176        StreamService service = Framework.getService(StreamService.class);
177        return service.getLogManager(getLogConfig());
178    }
179
180    protected String getLogConfig() {
181        return Framework.getProperty(AUDIT_LOG_CONFIG_PROP, DEFAULT_LOG_CONFIG);
182    }
183}