001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.audit.listener; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026 027import javax.naming.NamingException; 028import javax.transaction.RollbackException; 029import javax.transaction.Status; 030import javax.transaction.Synchronization; 031import javax.transaction.SystemException; 032import javax.transaction.TransactionManager; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.core.event.Event; 037import org.nuxeo.ecm.core.event.EventListener; 038import org.nuxeo.ecm.core.io.registry.MarshallerHelper; 039import org.nuxeo.ecm.core.io.registry.context.RenderingContext; 040import org.nuxeo.ecm.platform.audit.api.AuditLogger; 041import org.nuxeo.ecm.platform.audit.api.LogEntry; 042import org.nuxeo.lib.stream.computation.Record; 043import org.nuxeo.lib.stream.computation.StreamManager; 044import org.nuxeo.lib.stream.computation.Watermark; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.stream.StreamService; 047import org.nuxeo.runtime.transaction.TransactionHelper; 048 049/** 050 * An events collector that write log entries as json record into a stream. 051 * 052 * @since 9.3 053 */ 054public class StreamAuditEventListener implements EventListener, Synchronization { 055 private static final Log log = LogFactory.getLog(StreamAuditEventListener.class); 056 057 protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE); 058 059 protected static final ThreadLocal<List<LogEntry>> entries = ThreadLocal.withInitial(ArrayList::new); 060 061 public static final String STREAM_AUDIT_ENABLED_PROP = "nuxeo.stream.audit.enabled"; 062 063 public static final String STREAM_NAME = "audit/audit"; 064 065 @Deprecated 066 // @deprecated since 11.1 log config is not needed anymore 067 public static final String DEFAULT_LOG_CONFIG = "audit"; 068 069 @Override 070 public void handleEvent(Event event) { 071 AuditLogger logger = Framework.getService(AuditLogger.class); 072 if (logger == null) { 073 return; 074 } 075 if (!isEnlisted.get()) { 076 isEnlisted.set(registerSynchronization(this)); 077 entries.get().clear(); 078 if (log.isDebugEnabled()) { 079 log.debug("AuditEventListener collecting entries for the tx"); 080 } 081 } 082 if (logger.getAuditableEventNames().contains(event.getName())) { 083 entries.get().add(logger.buildEntryFromEvent(event)); 084 } 085 if (!isEnlisted.get()) { 086 // there is no transaction so don't wait for a commit 087 afterCompletion(Status.STATUS_COMMITTED); 088 } 089 090 } 091 092 @Override 093 public void beforeCompletion() { 094 if (log.isDebugEnabled()) { 095 log.debug(String.format("AuditEventListener going to write %d entries.", entries.get().size())); 096 } 097 } 098 099 @Override 100 public void afterCompletion(int status) { 101 try { 102 if (entries.get().isEmpty() 103 || (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status)) { 104 // This means that in case of rollback there is no event logged 105 return; 106 } 107 writeEntries(); 108 if (log.isDebugEnabled()) { 109 log.debug(String.format("AuditEventListener writes %d entries.", entries.get().size())); 110 } 111 } finally { 112 isEnlisted.set(false); 113 entries.get().clear(); 114 } 115 } 116 117 protected void writeEntries() { 118 if (entries.get().isEmpty()) { 119 return; 120 } 121 StreamManager streamManager = getStreamManager(); 122 for (LogEntry entry : entries.get()) { 123 if (entry != null) { 124 Record record = recordOf(entry); 125 streamManager.append(STREAM_NAME, record); 126 } 127 } 128 } 129 130 protected Record recordOf(LogEntry entry) { 131 String json = asJson(entry); 132 if (json == null) { 133 return null; 134 } 135 long timestamp = getTimestampForEntry(entry); 136 return new Record(String.valueOf(entry.getId()), json.getBytes(UTF_8), 137 Watermark.ofTimestamp(timestamp).getValue()); 138 } 139 140 protected long getTimestampForEntry(LogEntry entry) { 141 if (entry.getEventDate() != null) { 142 return entry.getEventDate().getTime(); 143 } 144 return System.currentTimeMillis(); 145 } 146 147 protected String asJson(LogEntry entry) { 148 if (entry == null) { 149 return null; 150 } 151 RenderingContext ctx = RenderingContext.CtxBuilder.get(); 152 try { 153 return MarshallerHelper.objectToJson(entry, ctx); 154 } catch (IOException e) { 155 log.warn("Unable to translate entry into json, eventId:" + entry.getEventId() + ": " + e.getMessage(), e); 156 return null; 157 } 158 } 159 160 protected boolean registerSynchronization(Synchronization sync) { 161 try { 162 TransactionManager tm = TransactionHelper.lookupTransactionManager(); 163 if (tm != null) { 164 if (tm.getTransaction() != null) { 165 tm.getTransaction().registerSynchronization(sync); 166 return true; 167 } 168 return false; 169 } else { 170 log.error("Unable to register synchronization : no TransactionManager"); 171 return false; 172 } 173 } catch (NamingException | IllegalStateException | SystemException | RollbackException e) { 174 log.error("Unable to register synchronization", e); 175 return false; 176 } 177 } 178 179 protected StreamManager getStreamManager() { 180 StreamService service = Framework.getService(StreamService.class); 181 return service.getStreamManager(); 182 } 183}