001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.audit.listener; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026 027import javax.naming.NamingException; 028import javax.transaction.RollbackException; 029import javax.transaction.Status; 030import javax.transaction.Synchronization; 031import javax.transaction.SystemException; 032import javax.transaction.TransactionManager; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.core.event.Event; 037import org.nuxeo.ecm.core.event.EventListener; 038import org.nuxeo.ecm.core.io.registry.MarshallerHelper; 039import org.nuxeo.ecm.core.io.registry.context.RenderingContext; 040import org.nuxeo.ecm.platform.audit.api.AuditLogger; 041import org.nuxeo.ecm.platform.audit.api.LogEntry; 042import org.nuxeo.lib.stream.computation.Record; 043import org.nuxeo.lib.stream.log.LogAppender; 044import org.nuxeo.lib.stream.log.LogManager; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.stream.StreamService; 047import org.nuxeo.runtime.transaction.TransactionHelper; 048 049/** 050 * An events collector that write log entries as json record into a stream. 051 * 052 * @since 9.3 053 */ 054public class StreamAuditEventListener implements EventListener, Synchronization { 055 private static final Log log = LogFactory.getLog(StreamAuditEventListener.class); 056 057 protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE); 058 059 protected static final ThreadLocal<List<LogEntry>> entries = ThreadLocal.withInitial(ArrayList::new); 060 061 public static final String STREAM_AUDIT_ENABLED_PROP = "nuxeo.stream.audit.enabled"; 062 063 public static final String AUDIT_LOG_CONFIG_PROP = "nuxeo.stream.audit.log.config"; 064 065 public static final String DEFAULT_LOG_CONFIG = "audit"; 066 067 public static final String STREAM_NAME = "audit"; 068 069 @Override 070 public void handleEvent(Event event) { 071 AuditLogger logger = Framework.getService(AuditLogger.class); 072 if (logger == null) { 073 return; 074 } 075 if (!isEnlisted.get()) { 076 isEnlisted.set(registerSynchronization(this)); 077 entries.get().clear(); 078 if (log.isDebugEnabled()) { 079 log.debug("AuditEventListener collecting entries for the tx"); 080 } 081 } 082 if (logger.getAuditableEventNames().contains(event.getName())) { 083 entries.get().add(logger.buildEntryFromEvent(event)); 084 } 085 if (!isEnlisted.get()) { 086 // there is no transaction so don't wait for a commit 087 afterCompletion(Status.STATUS_COMMITTED); 088 } 089 090 } 091 092 @Override 093 public void beforeCompletion() { 094 if (log.isDebugEnabled()) { 095 log.debug(String.format("AuditEventListener going to write %d entries.", entries.get().size())); 096 } 097 } 098 099 @Override 100 public void afterCompletion(int status) { 101 try { 102 if (entries.get().isEmpty() 103 || (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status)) { 104 // This means that in case of rollback there is no event logged 105 return; 106 } 107 writeEntries(); 108 if (log.isDebugEnabled()) { 109 log.debug(String.format("AuditEventListener writes %d entries.", entries.get().size())); 110 } 111 } finally { 112 isEnlisted.set(false); 113 entries.get().clear(); 114 } 115 } 116 117 protected void writeEntries() { 118 if (entries.get().isEmpty()) { 119 return; 120 } 121 LogAppender<Record> appender = getLogManager().getAppender(STREAM_NAME); 122 entries.get().forEach(entry -> writeEntry(appender, entry)); 123 } 124 125 protected void writeEntry(LogAppender<Record> appender, LogEntry entry) { 126 String json = asJson(entry); 127 if (json == null) { 128 return; 129 } 130 appender.append(0, Record.of(String.valueOf(entry.getId()), json.getBytes(UTF_8))); 131 } 132 133 protected String asJson(LogEntry entry) { 134 if (entry == null) { 135 return null; 136 } 137 RenderingContext ctx = RenderingContext.CtxBuilder.get(); 138 try { 139 return MarshallerHelper.objectToJson(entry, ctx); 140 } catch (IOException e) { 141 log.warn("Unable to translate entry into json, eventId:" + entry.getEventId() + ": " + e.getMessage(), e); 142 return null; 143 } 144 } 145 146 protected boolean registerSynchronization(Synchronization sync) { 147 try { 148 TransactionManager tm = TransactionHelper.lookupTransactionManager(); 149 if (tm != null) { 150 if (tm.getTransaction() != null) { 151 tm.getTransaction().registerSynchronization(sync); 152 return true; 153 } 154 return false; 155 } else { 156 log.error("Unable to register synchronization : no TransactionManager"); 157 return false; 158 } 159 } catch (NamingException | IllegalStateException | SystemException | RollbackException e) { 160 log.error("Unable to register synchronization", e); 161 return false; 162 } 163 } 164 165 protected LogManager getLogManager() { 166 StreamService service = Framework.getService(StreamService.class); 167 return service.getLogManager(getLogConfig()); 168 } 169 170 protected String getLogConfig() { 171 return Framework.getProperty(AUDIT_LOG_CONFIG_PROP, DEFAULT_LOG_CONFIG); 172 } 173}