001/* 002 * (C) Copyright 2010, 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 */ 019package org.nuxeo.ecm.platform.audit.service; 020 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Queue; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.locks.Condition; 028import java.util.concurrent.locks.ReentrantLock; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.ecm.platform.audit.api.LogEntry; 033import org.nuxeo.ecm.platform.audit.service.extension.AuditBulkerDescriptor; 034import org.nuxeo.ecm.platform.audit.service.management.AuditBulkerMBean; 035import org.nuxeo.runtime.api.Framework; 036import org.nuxeo.runtime.management.ResourcePublisher; 037import org.nuxeo.runtime.metrics.MetricsService; 038 039import com.codahale.metrics.Counter; 040import com.codahale.metrics.Gauge; 041import com.codahale.metrics.MetricRegistry; 042import com.codahale.metrics.SharedMetricRegistries; 043 044public class DefaultAuditBulker implements AuditBulkerMBean, AuditBulker { 045 046 final Log log = LogFactory.getLog(DefaultAuditBulker.class); 047 048 final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 049 050 final Gauge<Integer> sizeGauge = new Gauge<Integer>() { 051 052 @Override 053 public Integer getValue() { 054 return queue.size(); 055 } 056 057 }; 058 059 final AuditBackend backend; 060 061 final Counter queuedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "queued")); 062 063 final Counter drainedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "drained")); 064 065 int timeout; 066 067 int bulksize; 068 069 Thread thread; 070 071 DefaultAuditBulker(AuditBackend backend, AuditBulkerDescriptor config) { 072 this.backend = backend; 073 timeout = config.timeout; 074 bulksize = config.size; 075 } 076 077 @Override 078 public void onApplicationStarted() { 079 thread = new Thread(new Consumer(), "Nuxeo-Audit-Bulker"); 080 thread.start(); 081 ResourcePublisher publisher = Framework.getService(ResourcePublisher.class); 082 if (publisher != null) { 083 publisher.registerResource("audit-bulker", "audit-bulker", AuditBulkerMBean.class, this); 084 } 085 registry.register(MetricRegistry.name("nuxeo", "audit", "size"), sizeGauge); 086 } 087 088 @Override 089 public void onApplicationStopped() { 090 registry.remove(MetricRegistry.name("nuxeo", "audit", "size")); 091 ResourcePublisher publisher = Framework.getService(ResourcePublisher.class); 092 if (publisher != null) { 093 publisher.unregisterResource("audit-bulker", "audit-bulker"); 094 } 095 stopped = true; 096 try { 097 thread.interrupt(); 098 } finally { 099 thread = null; 100 } 101 } 102 103 final AtomicInteger size = new AtomicInteger(0); 104 105 final ReentrantLock lock = new ReentrantLock(); 106 107 final Condition isEmpty = lock.newCondition(); 108 109 final Condition isFilled = lock.newCondition(); 110 111 final Queue<LogEntry> queue = new ConcurrentLinkedQueue<>(); 112 113 volatile boolean stopped; 114 115 @Override 116 public void offer(LogEntry entry) { 117 if (log.isDebugEnabled()) { 118 log.debug("offered " + entry); 119 } 120 queue.add(entry); 121 queuedCount.inc(); 122 123 if (size.incrementAndGet() >= bulksize) { 124 lock.lock(); 125 try { 126 isFilled.signalAll(); 127 } finally { 128 lock.unlock(); 129 } 130 } 131 } 132 133 @Override 134 public boolean await(long time, TimeUnit unit) throws InterruptedException { 135 lock.lock(); 136 try { 137 isFilled.signalAll(); 138 return isEmpty.await(time, unit); 139 } finally { 140 lock.unlock(); 141 } 142 } 143 144 int drain() { 145 List<LogEntry> entries = new LinkedList<>(); 146 while (!queue.isEmpty()) { 147 entries.add(queue.remove()); 148 } 149 backend.addLogEntries(entries); 150 int delta = entries.size(); 151 size.addAndGet(-delta); 152 drainedCount.inc(delta); 153 return delta; 154 } 155 156 class Consumer implements Runnable { 157 158 @Override 159 public void run() { 160 log.info("bulk audit logger started"); 161 while (!stopped) { 162 lock.lock(); 163 try { 164 isFilled.await(timeout, TimeUnit.MILLISECONDS); 165 if (queue.isEmpty()) { 166 continue; 167 } 168 int count = drain(); 169 if (log.isDebugEnabled()) { 170 log.debug("flushed " + count + " events"); 171 } 172 } catch (InterruptedException cause) { 173 Thread.currentThread().interrupt(); 174 return; 175 } finally { 176 isEmpty.signalAll(); 177 lock.unlock(); 178 } 179 } 180 log.info("bulk audit logger stopped"); 181 } 182 183 } 184 185 @Override 186 public int getBulkTimeout() { 187 return timeout; 188 } 189 190 @Override 191 public void setBulkTimeout(int value) { 192 timeout = value; 193 } 194 195 @Override 196 public int getBulkSize() { 197 return bulksize; 198 } 199 200 @Override 201 public void setBulkSize(int value) { 202 bulksize = value; 203 } 204 205 @Override 206 public void resetMetrics() { 207 queuedCount.dec(queuedCount.getCount()); 208 drainedCount.dec(drainedCount.getCount()); 209 } 210}