001/* 002 * (C) Copyright 2010, 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo - initial API and implementation 018 */ 019package org.nuxeo.ecm.platform.audit.service; 020 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Queue; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.locks.Condition; 028import java.util.concurrent.locks.ReentrantLock; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.ecm.platform.audit.api.LogEntry; 033import org.nuxeo.ecm.platform.audit.service.extension.AuditBulkerDescriptor; 034import org.nuxeo.ecm.platform.audit.service.management.AuditBulkerMBean; 035import org.nuxeo.runtime.api.Framework; 036import org.nuxeo.runtime.management.ResourcePublisher; 037import org.nuxeo.runtime.metrics.MetricsService; 038 039import io.dropwizard.metrics5.Counter; 040import io.dropwizard.metrics5.Gauge; 041import io.dropwizard.metrics5.MetricRegistry; 042import io.dropwizard.metrics5.SharedMetricRegistries; 043 044/** 045 * @deprecated since 10.10, audit bulker is now handled with nuxeo-stream, no replacement 046 */ 047@Deprecated 048public class DefaultAuditBulker implements AuditBulkerMBean, AuditBulker { 049 050 final Log log = LogFactory.getLog(DefaultAuditBulker.class); 051 052 final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 053 054 final Gauge<Integer> sizeGauge = new Gauge<Integer>() { 055 056 @Override 057 public Integer getValue() { 058 return queue.size(); 059 } 060 061 }; 062 063 final AuditBackend backend; 064 065 final Counter queuedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "queued")); 066 067 final Counter drainedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "drained")); 068 069 int timeout; 070 071 int bulksize; 072 073 Thread thread; 074 075 DefaultAuditBulker(AuditBackend backend, AuditBulkerDescriptor config) { 076 this.backend = backend; 077 timeout = config.timeout; 078 bulksize = config.size; 079 } 080 081 @Override 082 public void onApplicationStarted() { 083 thread = new Thread(new Consumer(), "Nuxeo-Audit-Bulker"); 084 thread.start(); 085 ResourcePublisher publisher = Framework.getService(ResourcePublisher.class); 086 if (publisher != null) { 087 publisher.registerResource("audit-bulker", "audit-bulker", AuditBulkerMBean.class, this); 088 } 089 registry.register(MetricRegistry.name("nuxeo", "audit", "size"), sizeGauge); 090 } 091 092 @Override 093 public void onApplicationStopped() { 094 registry.remove(MetricRegistry.name("nuxeo", "audit", "size")); 095 ResourcePublisher publisher = Framework.getService(ResourcePublisher.class); 096 if (publisher != null) { 097 publisher.unregisterResource("audit-bulker", "audit-bulker"); 098 } 099 stopped = true; 100 try { 101 thread.interrupt(); 102 } finally { 103 thread = null; 104 } 105 } 106 107 final AtomicInteger size = new AtomicInteger(0); 108 109 final ReentrantLock lock = new ReentrantLock(); 110 111 final Condition isEmpty = lock.newCondition(); 112 113 final Condition isFilled = lock.newCondition(); 114 115 final Queue<LogEntry> queue = new ConcurrentLinkedQueue<>(); 116 117 volatile boolean stopped; 118 119 @Override 120 public void offer(LogEntry entry) { 121 if (log.isDebugEnabled()) { 122 log.debug("offered " + entry); 123 } 124 queue.add(entry); 125 queuedCount.inc(); 126 127 if (size.incrementAndGet() >= bulksize) { 128 lock.lock(); 129 try { 130 isFilled.signalAll(); 131 } finally { 132 lock.unlock(); 133 } 134 } 135 } 136 137 @Override 138 public boolean await(long time, TimeUnit unit) throws InterruptedException { 139 lock.lock(); 140 try { 141 isFilled.signalAll(); 142 long nanos = unit.toNanos(time); 143 while (!queue.isEmpty()) { 144 if (nanos <= 0) { 145 return false; 146 } 147 nanos = isEmpty.awaitNanos(nanos); 148 } 149 return true; 150 } finally { 151 lock.unlock(); 152 } 153 } 154 155 int drain() { 156 List<LogEntry> entries = new LinkedList<>(); 157 while (!queue.isEmpty()) { 158 entries.add(queue.remove()); 159 } 160 backend.addLogEntries(entries); 161 int delta = entries.size(); 162 size.addAndGet(-delta); 163 drainedCount.inc(delta); 164 return delta; 165 } 166 167 class Consumer implements Runnable { 168 169 @Override 170 public void run() { 171 log.info("bulk audit logger started"); 172 while (!stopped) { 173 lock.lock(); 174 try { 175 isFilled.await(timeout, TimeUnit.MILLISECONDS); // NOSONAR (spurious wakeups don't matter) 176 if (queue.isEmpty()) { 177 continue; 178 } 179 int count = drain(); 180 if (log.isDebugEnabled()) { 181 log.debug("flushed " + count + " events"); 182 } 183 } catch (InterruptedException cause) { 184 Thread.currentThread().interrupt(); 185 return; 186 } finally { 187 try { 188 isEmpty.signalAll(); 189 } finally { 190 lock.unlock(); 191 } 192 } 193 } 194 log.info("bulk audit logger stopped"); 195 } 196 197 } 198 199 @Override 200 public int getBulkTimeout() { 201 return timeout; 202 } 203 204 @Override 205 public void setBulkTimeout(int value) { 206 timeout = value; 207 } 208 209 @Override 210 public int getBulkSize() { 211 return bulksize; 212 } 213 214 @Override 215 public void setBulkSize(int value) { 216 bulksize = value; 217 } 218 219 @Override 220 public void resetMetrics() { 221 queuedCount.dec(queuedCount.getCount()); 222 drainedCount.dec(drainedCount.getCount()); 223 } 224}