001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.audit.listener;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026
027import javax.naming.NamingException;
028import javax.transaction.RollbackException;
029import javax.transaction.Status;
030import javax.transaction.Synchronization;
031import javax.transaction.SystemException;
032import javax.transaction.TransactionManager;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.core.event.Event;
037import org.nuxeo.ecm.core.event.EventListener;
038import org.nuxeo.ecm.core.io.registry.MarshallerHelper;
039import org.nuxeo.ecm.core.io.registry.context.RenderingContext;
040import org.nuxeo.ecm.platform.audit.api.AuditLogger;
041import org.nuxeo.ecm.platform.audit.api.LogEntry;
042import org.nuxeo.lib.stream.computation.Record;
043import org.nuxeo.lib.stream.log.LogAppender;
044import org.nuxeo.lib.stream.log.LogManager;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.stream.StreamService;
047import org.nuxeo.runtime.transaction.TransactionHelper;
048
049/**
050 * An events collector that write log entries as json record into a stream.
051 *
052 * @since 9.3
053 */
054public class StreamAuditEventListener implements EventListener, Synchronization {
055    private static final Log log = LogFactory.getLog(StreamAuditEventListener.class);
056
057    protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE);
058
059    protected static final ThreadLocal<List<LogEntry>> entries = ThreadLocal.withInitial(ArrayList::new);
060
061    public static final String STREAM_AUDIT_ENABLED_PROP = "nuxeo.stream.audit.enabled";
062
063    public static final String AUDIT_LOG_CONFIG_PROP = "nuxeo.stream.audit.log.config";
064
065    public static final String DEFAULT_LOG_CONFIG = "audit";
066
067    public static final String STREAM_NAME = "audit";
068
069    @Override
070    public void handleEvent(Event event) {
071        AuditLogger logger = Framework.getService(AuditLogger.class);
072        if (logger == null) {
073            return;
074        }
075        if (!isEnlisted.get()) {
076            isEnlisted.set(registerSynchronization(this));
077            entries.get().clear();
078            if (log.isDebugEnabled()) {
079                log.debug("AuditEventListener collecting entries for the tx");
080            }
081        }
082        if (logger.getAuditableEventNames().contains(event.getName())) {
083            entries.get().add(logger.buildEntryFromEvent(event));
084        }
085        if (!isEnlisted.get()) {
086            // there is no transaction so don't wait for a commit
087            afterCompletion(Status.STATUS_COMMITTED);
088        }
089
090    }
091
092    @Override
093    public void beforeCompletion() {
094        if (log.isDebugEnabled()) {
095            log.debug(String.format("AuditEventListener going to write %d entries.", entries.get().size()));
096        }
097    }
098
099    @Override
100    public void afterCompletion(int status) {
101        try {
102            if (entries.get().isEmpty()
103                    || (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status)) {
104                // This means that in case of rollback there is no event logged
105                return;
106            }
107            writeEntries();
108            if (log.isDebugEnabled()) {
109                log.debug(String.format("AuditEventListener writes %d entries.", entries.get().size()));
110            }
111        } finally {
112            isEnlisted.set(false);
113            entries.get().clear();
114        }
115    }
116
117    protected void writeEntries() {
118        if (entries.get().isEmpty()) {
119            return;
120        }
121        LogAppender<Record> appender = getLogManager().getAppender(STREAM_NAME);
122        entries.get().forEach(entry -> writeEntry(appender, entry));
123    }
124
125    protected void writeEntry(LogAppender<Record> appender, LogEntry entry) {
126        String json = asJson(entry);
127        if (json == null) {
128            return;
129        }
130        appender.append(0, Record.of(String.valueOf(entry.getId()), json.getBytes(UTF_8)));
131    }
132
133    protected String asJson(LogEntry entry) {
134        if (entry == null) {
135            return null;
136        }
137        RenderingContext ctx = RenderingContext.CtxBuilder.get();
138        try {
139            return MarshallerHelper.objectToJson(entry, ctx);
140        } catch (IOException e) {
141            log.warn("Unable to translate entry into json, eventId:" + entry.getEventId() + ": " + e.getMessage(), e);
142            return null;
143        }
144    }
145
146    protected boolean registerSynchronization(Synchronization sync) {
147        try {
148            TransactionManager tm = TransactionHelper.lookupTransactionManager();
149            if (tm != null) {
150                if (tm.getTransaction() != null) {
151                    tm.getTransaction().registerSynchronization(sync);
152                    return true;
153                }
154                return false;
155            } else {
156                log.error("Unable to register synchronization : no TransactionManager");
157                return false;
158            }
159        } catch (NamingException | IllegalStateException | SystemException | RollbackException e) {
160            log.error("Unable to register synchronization", e);
161            return false;
162        }
163    }
164
165    protected LogManager getLogManager() {
166        StreamService service = Framework.getService(StreamService.class);
167        return service.getLogManager(getLogConfig());
168    }
169
170    protected String getLogConfig() {
171        return Framework.getProperty(AUDIT_LOG_CONFIG_PROP, DEFAULT_LOG_CONFIG);
172    }
173}