001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.audit.listener;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026
027import javax.naming.NamingException;
028import javax.transaction.RollbackException;
029import javax.transaction.Status;
030import javax.transaction.Synchronization;
031import javax.transaction.SystemException;
032import javax.transaction.TransactionManager;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.core.event.Event;
037import org.nuxeo.ecm.core.event.EventListener;
038import org.nuxeo.ecm.core.io.registry.MarshallerHelper;
039import org.nuxeo.ecm.core.io.registry.context.RenderingContext;
040import org.nuxeo.ecm.platform.audit.api.AuditLogger;
041import org.nuxeo.ecm.platform.audit.api.LogEntry;
042import org.nuxeo.lib.stream.computation.Record;
043import org.nuxeo.lib.stream.computation.StreamManager;
044import org.nuxeo.lib.stream.computation.Watermark;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.stream.StreamService;
047import org.nuxeo.runtime.transaction.TransactionHelper;
048
049/**
050 * An events collector that write log entries as json record into a stream.
051 *
052 * @since 9.3
053 */
054public class StreamAuditEventListener implements EventListener, Synchronization {
055    private static final Log log = LogFactory.getLog(StreamAuditEventListener.class);
056
057    protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE);
058
059    protected static final ThreadLocal<List<LogEntry>> entries = ThreadLocal.withInitial(ArrayList::new);
060
061    public static final String STREAM_AUDIT_ENABLED_PROP = "nuxeo.stream.audit.enabled";
062
063    public static final String STREAM_NAME = "audit/audit";
064
065    @Deprecated
066    // @deprecated since 11.1 log config is not needed anymore
067    public static final String DEFAULT_LOG_CONFIG = "audit";
068
069    @Override
070    public void handleEvent(Event event) {
071        AuditLogger logger = Framework.getService(AuditLogger.class);
072        if (logger == null) {
073            return;
074        }
075        if (!isEnlisted.get()) {
076            isEnlisted.set(registerSynchronization(this));
077            entries.get().clear();
078            if (log.isDebugEnabled()) {
079                log.debug("AuditEventListener collecting entries for the tx");
080            }
081        }
082        if (logger.getAuditableEventNames().contains(event.getName())) {
083            entries.get().add(logger.buildEntryFromEvent(event));
084        }
085        if (!isEnlisted.get()) {
086            // there is no transaction so don't wait for a commit
087            afterCompletion(Status.STATUS_COMMITTED);
088        }
089
090    }
091
092    @Override
093    public void beforeCompletion() {
094        if (log.isDebugEnabled()) {
095            log.debug(String.format("AuditEventListener going to write %d entries.", entries.get().size()));
096        }
097    }
098
099    @Override
100    public void afterCompletion(int status) {
101        try {
102            if (entries.get().isEmpty()
103                    || (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status)) {
104                // This means that in case of rollback there is no event logged
105                return;
106            }
107            writeEntries();
108            if (log.isDebugEnabled()) {
109                log.debug(String.format("AuditEventListener writes %d entries.", entries.get().size()));
110            }
111        } finally {
112            isEnlisted.set(false);
113            entries.get().clear();
114        }
115    }
116
117    protected void writeEntries() {
118        if (entries.get().isEmpty()) {
119            return;
120        }
121        StreamManager streamManager = getStreamManager();
122        for (LogEntry entry : entries.get()) {
123            if (entry != null) {
124                Record record = recordOf(entry);
125                streamManager.append(STREAM_NAME, record);
126            }
127        }
128    }
129
130    protected Record recordOf(LogEntry entry) {
131        String json = asJson(entry);
132        if (json == null) {
133            return null;
134        }
135        long timestamp = getTimestampForEntry(entry);
136        return new Record(String.valueOf(entry.getId()), json.getBytes(UTF_8),
137                Watermark.ofTimestamp(timestamp).getValue());
138    }
139
140    protected long getTimestampForEntry(LogEntry entry) {
141        if (entry.getEventDate() != null) {
142            return entry.getEventDate().getTime();
143        }
144        return System.currentTimeMillis();
145    }
146
147    protected String asJson(LogEntry entry) {
148        if (entry == null) {
149            return null;
150        }
151        RenderingContext ctx = RenderingContext.CtxBuilder.get();
152        try {
153            return MarshallerHelper.objectToJson(entry, ctx);
154        } catch (IOException e) {
155            log.warn("Unable to translate entry into json, eventId:" + entry.getEventId() + ": " + e.getMessage(), e);
156            return null;
157        }
158    }
159
160    protected boolean registerSynchronization(Synchronization sync) {
161        try {
162            TransactionManager tm = TransactionHelper.lookupTransactionManager();
163            if (tm != null) {
164                if (tm.getTransaction() != null) {
165                    tm.getTransaction().registerSynchronization(sync);
166                    return true;
167                }
168                return false;
169            } else {
170                log.error("Unable to register synchronization : no TransactionManager");
171                return false;
172            }
173        } catch (NamingException | IllegalStateException | SystemException | RollbackException e) {
174            log.error("Unable to register synchronization", e);
175            return false;
176        }
177    }
178
179    protected StreamManager getStreamManager() {
180        StreamService service = Framework.getService(StreamService.class);
181        return service.getStreamManager();
182    }
183}