001/* 002 * (C) Copyright 2010, 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 */ 019package org.nuxeo.ecm.platform.audit.service; 020 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Queue; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.locks.Condition; 027import java.util.concurrent.locks.ReentrantLock; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.ecm.platform.audit.api.LogEntry; 032import org.nuxeo.ecm.platform.audit.service.extension.AuditBulkerDescriptor; 033import org.nuxeo.ecm.platform.audit.service.management.AuditBulkerMBean; 034import org.nuxeo.runtime.api.Framework; 035import org.nuxeo.runtime.management.ResourcePublisher; 036import org.nuxeo.runtime.metrics.MetricsService; 037 038import com.codahale.metrics.Counter; 039import com.codahale.metrics.Gauge; 040import com.codahale.metrics.MetricRegistry; 041import com.codahale.metrics.SharedMetricRegistries; 042 043public class DefaultAuditBulker implements AuditBulkerMBean, AuditBulker { 044 045 final Log log = LogFactory.getLog(DefaultAuditBulker.class); 046 047 final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 048 049 final Gauge<Integer> sizeGauge = new Gauge<Integer>() { 050 051 @Override 052 public Integer getValue() { 053 return queue.size(); 054 } 055 056 }; 057 058 final AuditBackend backend; 059 060 final Counter queuedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "queued")); 061 062 final Counter drainedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "drained")); 063 064 int timeout; 065 066 int size; 067 068 Thread thread; 069 070 DefaultAuditBulker(AuditBackend backend, AuditBulkerDescriptor config) { 071 this.backend = backend; 072 timeout = config.timeout; 073 size = config.size; 074 } 075 076 @Override 077 public void onApplicationStarted() { 078 thread = new Thread(new Consumer(), "Nuxeo-Audit-Bulker"); 079 thread.start(); 080 ResourcePublisher publisher = Framework.getService(ResourcePublisher.class); 081 if (publisher != null) { 082 publisher.registerResource("audit-bulker", "audit-bulker", AuditBulkerMBean.class, this); 083 } 084 registry.register(MetricRegistry.name("nuxeo", "audit", "size"), sizeGauge); 085 } 086 087 @Override 088 public void onShutdown() { 089 registry.remove(MetricRegistry.name("nuxeo", "audit", "size")); 090 ResourcePublisher publisher = Framework.getService(ResourcePublisher.class); 091 if (publisher != null) { 092 publisher.unregisterResource("audit-bulker", "audit-bulker"); 093 } 094 stopped = true; 095 try { 096 thread.interrupt(); 097 } finally { 098 thread = null; 099 } 100 } 101 102 final ReentrantLock lock = new ReentrantLock(); 103 104 final Condition isEmpty = lock.newCondition(); 105 106 final Condition isFilled = lock.newCondition(); 107 108 final Queue<LogEntry> queue = new ConcurrentLinkedQueue<>(); 109 110 volatile boolean stopped; 111 112 @Override 113 public void offer(LogEntry entry) { 114 if (log.isDebugEnabled()) { 115 log.debug("offered " + entry); 116 } 117 queue.add(entry); 118 queuedCount.inc(); 119 if (queue.size() >= size) { 120 lock.lock(); 121 try { 122 isFilled.signalAll(); 123 } finally { 124 lock.unlock(); 125 } 126 } 127 } 128 129 @Override 130 public boolean await(long time, TimeUnit unit) throws InterruptedException { 131 if (queue.isEmpty()) { 132 return true; 133 } 134 lock.lock(); 135 try { 136 isFilled.signalAll(); 137 return isEmpty.await(time, unit); 138 } finally { 139 lock.unlock(); 140 } 141 } 142 143 int drain() { 144 List<LogEntry> entries = new LinkedList<>(); 145 while (!queue.isEmpty()) { 146 entries.add(queue.remove()); 147 } 148 backend.addLogEntries(entries); 149 drainedCount.inc(entries.size()); 150 if (queue.isEmpty()) { 151 lock.lock(); 152 try { 153 isEmpty.signalAll(); 154 } finally { 155 lock.unlock(); 156 } 157 } 158 return entries.size(); 159 } 160 161 class Consumer implements Runnable { 162 163 @Override 164 public void run() { 165 log.info("bulk audit logger started"); 166 while (!stopped) { 167 lock.lock(); 168 try { 169 isFilled.await(timeout, TimeUnit.MILLISECONDS); 170 if (queue.isEmpty()) { 171 continue; 172 } 173 } catch (InterruptedException cause) { 174 Thread.currentThread().interrupt(); 175 return; 176 } finally { 177 lock.unlock(); 178 } 179 try { 180 int count = drain(); 181 if (log.isDebugEnabled()) { 182 log.debug("flushed " + count + " events"); 183 } 184 } catch (RuntimeException cause) { 185 log.error("caught error while draining audit queue", cause); 186 } 187 } 188 log.info("bulk audit logger stopped"); 189 } 190 191 } 192 193 @Override 194 public int getBulkTimeout() { 195 return timeout; 196 } 197 198 @Override 199 public void setBulkTimeout(int value) { 200 timeout = value; 201 } 202 203 @Override 204 public int getBulkSize() { 205 return size; 206 } 207 208 @Override 209 public void setBulkSize(int value) { 210 size = value; 211 } 212 213 @Override 214 public void resetMetrics() { 215 queuedCount.dec(queuedCount.getCount()); 216 drainedCount.dec(drainedCount.getCount()); 217 } 218}