001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.audit.listener;
020
021import java.io.IOException;
022import java.nio.charset.StandardCharsets;
023import java.util.ArrayList;
024import java.util.List;
025
026import javax.naming.NamingException;
027import javax.transaction.RollbackException;
028import javax.transaction.Status;
029import javax.transaction.Synchronization;
030import javax.transaction.SystemException;
031import javax.transaction.TransactionManager;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.core.event.Event;
036import org.nuxeo.ecm.core.event.EventListener;
037import org.nuxeo.ecm.core.io.registry.MarshallerHelper;
038import org.nuxeo.ecm.core.io.registry.context.RenderingContext;
039import org.nuxeo.ecm.platform.audit.api.AuditLogger;
040import org.nuxeo.ecm.platform.audit.api.LogEntry;
041import org.nuxeo.lib.stream.computation.Record;
042import org.nuxeo.lib.stream.log.LogAppender;
043import org.nuxeo.lib.stream.log.LogManager;
044import org.nuxeo.runtime.api.Framework;
045import org.nuxeo.runtime.stream.StreamService;
046import org.nuxeo.runtime.transaction.TransactionHelper;
047
048/**
049 * An events collector that write log entries as json record into a stream.
050 *
051 * @since 9.3
052 */
053public class StreamAuditEventListener implements EventListener, Synchronization {
054    private static final Log log = LogFactory.getLog(StreamAuditEventListener.class);
055
056    protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE);
057
058    protected static final ThreadLocal<List<LogEntry>> entries = ThreadLocal.withInitial(ArrayList::new);
059
060    public static final String STREAM_AUDIT_ENABLED_PROP = "nuxeo.stream.audit.enabled";
061
062    public static final String AUDIT_LOG_CONFIG_PROP = "nuxeo.stream.audit.log.config";
063
064    public static final String DEFAULT_LOG_CONFIG = "audit";
065
066    public static final String STREAM_NAME = "audit";
067
068    @Override
069    public void handleEvent(Event event) {
070        AuditLogger logger = Framework.getService(AuditLogger.class);
071        if (logger == null) {
072            return;
073        }
074        if (!isEnlisted.get()) {
075            isEnlisted.set(registerSynchronization(this));
076            entries.get().clear();
077            if (log.isDebugEnabled()) {
078                log.debug("AuditEventListener collecting entries for the tx");
079            }
080        }
081        if (logger.getAuditableEventNames().contains(event.getName())) {
082            entries.get().add(logger.buildEntryFromEvent(event));
083        }
084        if (!isEnlisted.get()) {
085            // there is no transaction so don't wait for a commit
086            afterCompletion(Status.STATUS_COMMITTED);
087        }
088
089    }
090
091    @Override
092    public void beforeCompletion() {
093        if (log.isDebugEnabled()) {
094            log.debug(String.format("AuditEventListener going to write %d entries.", entries.get().size()));
095        }
096    }
097
098    @Override
099    public void afterCompletion(int status) {
100        try {
101            if (entries.get().isEmpty()
102                    || (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status)) {
103                // This means that in case of rollback there is no event logged
104                return;
105            }
106            writeEntries();
107            if (log.isDebugEnabled()) {
108                log.debug(String.format("AuditEventListener writes %d entries.", entries.get().size()));
109            }
110        } finally {
111            isEnlisted.set(false);
112            entries.get().clear();
113        }
114    }
115
116    protected void writeEntries() {
117        if (entries.get().isEmpty()) {
118            return;
119        }
120        LogAppender<Record> appender = getLogManager().getAppender(STREAM_NAME);
121        entries.get().forEach(entry -> writeEntry(appender, entry));
122    }
123
124    protected void writeEntry(LogAppender<Record> appender, LogEntry entry) {
125        String json = asJson(entry);
126        if (json == null) {
127            return;
128        }
129        appender.append(0, Record.of(String.valueOf(entry.getId()), json.getBytes(StandardCharsets.UTF_8)));
130    }
131
132    protected String asJson(LogEntry entry) {
133        if (entry == null) {
134            return null;
135        }
136        RenderingContext ctx = RenderingContext.CtxBuilder.get();
137        try {
138            return MarshallerHelper.objectToJson(entry, ctx);
139        } catch (IOException e) {
140            log.warn("Unable to translate entry into json, eventId:" + entry.getEventId() + ": " + e.getMessage(), e);
141            return null;
142        }
143    }
144
145    protected boolean registerSynchronization(Synchronization sync) {
146        try {
147            TransactionManager tm = TransactionHelper.lookupTransactionManager();
148            if (tm != null) {
149                if (tm.getTransaction() != null) {
150                    tm.getTransaction().registerSynchronization(sync);
151                    return true;
152                }
153                return false;
154            } else {
155                log.error("Unable to register synchronization : no TransactionManager");
156                return false;
157            }
158        } catch (NamingException | IllegalStateException | SystemException | RollbackException e) {
159            log.error("Unable to register synchronization", e);
160            return false;
161        }
162    }
163
164    protected LogManager getLogManager() {
165        StreamService service = Framework.getService(StreamService.class);
166        return service.getLogManager(getLogConfig());
167    }
168
169    protected String getLogConfig() {
170        return Framework.getProperty(AUDIT_LOG_CONFIG_PROP, DEFAULT_LOG_CONFIG);
171    }
172}