001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.audit.listener; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026 027import javax.naming.NamingException; 028import javax.transaction.RollbackException; 029import javax.transaction.Status; 030import javax.transaction.Synchronization; 031import javax.transaction.SystemException; 032import javax.transaction.TransactionManager; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.core.event.Event; 037import org.nuxeo.ecm.core.event.EventListener; 038import org.nuxeo.ecm.core.io.registry.MarshallerHelper; 039import org.nuxeo.ecm.core.io.registry.context.RenderingContext; 040import org.nuxeo.ecm.platform.audit.api.AuditLogger; 041import org.nuxeo.ecm.platform.audit.api.LogEntry; 042import org.nuxeo.lib.stream.computation.Record; 043import org.nuxeo.lib.stream.computation.Watermark; 044import org.nuxeo.lib.stream.log.LogAppender; 045import org.nuxeo.lib.stream.log.LogManager; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.stream.StreamService; 048import org.nuxeo.runtime.transaction.TransactionHelper; 049 050/** 051 * An events collector that write log entries as json record into a stream. 052 * 053 * @since 9.3 054 */ 055public class StreamAuditEventListener implements EventListener, Synchronization { 056 private static final Log log = LogFactory.getLog(StreamAuditEventListener.class); 057 058 protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE); 059 060 protected static final ThreadLocal<List<LogEntry>> entries = ThreadLocal.withInitial(ArrayList::new); 061 062 public static final String STREAM_AUDIT_ENABLED_PROP = "nuxeo.stream.audit.enabled"; 063 064 public static final String AUDIT_LOG_CONFIG_PROP = "nuxeo.stream.audit.log.config"; 065 066 public static final String DEFAULT_LOG_CONFIG = "audit"; 067 068 public static final String STREAM_NAME = "audit"; 069 070 @Override 071 public void handleEvent(Event event) { 072 AuditLogger logger = Framework.getService(AuditLogger.class); 073 if (logger == null) { 074 return; 075 } 076 if (!isEnlisted.get()) { 077 isEnlisted.set(registerSynchronization(this)); 078 entries.get().clear(); 079 if (log.isDebugEnabled()) { 080 log.debug("AuditEventListener collecting entries for the tx"); 081 } 082 } 083 if (logger.getAuditableEventNames().contains(event.getName())) { 084 entries.get().add(logger.buildEntryFromEvent(event)); 085 } 086 if (!isEnlisted.get()) { 087 // there is no transaction so don't wait for a commit 088 afterCompletion(Status.STATUS_COMMITTED); 089 } 090 091 } 092 093 @Override 094 public void beforeCompletion() { 095 if (log.isDebugEnabled()) { 096 log.debug(String.format("AuditEventListener going to write %d entries.", entries.get().size())); 097 } 098 } 099 100 @Override 101 public void afterCompletion(int status) { 102 try { 103 if (entries.get().isEmpty() 104 || (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status)) { 105 // This means that in case of rollback there is no event logged 106 return; 107 } 108 writeEntries(); 109 if (log.isDebugEnabled()) { 110 log.debug(String.format("AuditEventListener writes %d entries.", entries.get().size())); 111 } 112 } finally { 113 isEnlisted.set(false); 114 entries.get().clear(); 115 } 116 } 117 118 protected void writeEntries() { 119 if (entries.get().isEmpty()) { 120 return; 121 } 122 LogAppender<Record> appender = getLogManager().getAppender(STREAM_NAME); 123 entries.get().forEach(entry -> writeEntry(appender, entry)); 124 } 125 126 protected void writeEntry(LogAppender<Record> appender, LogEntry entry) { 127 String json = asJson(entry); 128 if (json == null) { 129 return; 130 } 131 long timestamp = getTimestampForEntry(entry); 132 appender.append(0, new Record(String.valueOf(entry.getId()), json.getBytes(UTF_8), 133 Watermark.ofTimestamp(timestamp).getValue())); 134 } 135 136 protected long getTimestampForEntry(LogEntry entry) { 137 if (entry.getEventDate() != null) { 138 return entry.getEventDate().getTime(); 139 } 140 return System.currentTimeMillis(); 141 } 142 143 protected String asJson(LogEntry entry) { 144 if (entry == null) { 145 return null; 146 } 147 RenderingContext ctx = RenderingContext.CtxBuilder.get(); 148 try { 149 return MarshallerHelper.objectToJson(entry, ctx); 150 } catch (IOException e) { 151 log.warn("Unable to translate entry into json, eventId:" + entry.getEventId() + ": " + e.getMessage(), e); 152 return null; 153 } 154 } 155 156 protected boolean registerSynchronization(Synchronization sync) { 157 try { 158 TransactionManager tm = TransactionHelper.lookupTransactionManager(); 159 if (tm != null) { 160 if (tm.getTransaction() != null) { 161 tm.getTransaction().registerSynchronization(sync); 162 return true; 163 } 164 return false; 165 } else { 166 log.error("Unable to register synchronization : no TransactionManager"); 167 return false; 168 } 169 } catch (NamingException | IllegalStateException | SystemException | RollbackException e) { 170 log.error("Unable to register synchronization", e); 171 return false; 172 } 173 } 174 175 protected LogManager getLogManager() { 176 StreamService service = Framework.getService(StreamService.class); 177 return service.getLogManager(getLogConfig()); 178 } 179 180 protected String getLogConfig() { 181 return Framework.getProperty(AUDIT_LOG_CONFIG_PROP, DEFAULT_LOG_CONFIG); 182 } 183}