001/*
002 * (C) Copyright 2010, 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 */
019package org.nuxeo.ecm.platform.audit.service;
020
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Queue;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.ReentrantLock;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.ecm.platform.audit.api.LogEntry;
033import org.nuxeo.ecm.platform.audit.service.extension.AuditBulkerDescriptor;
034import org.nuxeo.ecm.platform.audit.service.management.AuditBulkerMBean;
035import org.nuxeo.runtime.api.Framework;
036import org.nuxeo.runtime.management.ResourcePublisher;
037import org.nuxeo.runtime.metrics.MetricsService;
038
039import com.codahale.metrics.Counter;
040import com.codahale.metrics.Gauge;
041import com.codahale.metrics.MetricRegistry;
042import com.codahale.metrics.SharedMetricRegistries;
043
044public class DefaultAuditBulker implements AuditBulkerMBean, AuditBulker {
045
046    final Log log = LogFactory.getLog(DefaultAuditBulker.class);
047
048    final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
049
050    final Gauge<Integer> sizeGauge = new Gauge<Integer>() {
051
052        @Override
053        public Integer getValue() {
054            return queue.size();
055        }
056
057    };
058
059    final AuditBackend backend;
060
061    final Counter queuedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "queued"));
062
063    final Counter drainedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "drained"));
064
065    int timeout;
066
067    int bulksize;
068
069    Thread thread;
070
071    DefaultAuditBulker(AuditBackend backend, AuditBulkerDescriptor config) {
072        this.backend = backend;
073        timeout = config.timeout;
074        bulksize = config.size;
075    }
076
077    @Override
078    public void onApplicationStarted() {
079        thread = new Thread(new Consumer(), "Nuxeo-Audit-Bulker");
080        thread.start();
081        ResourcePublisher publisher = Framework.getService(ResourcePublisher.class);
082        if (publisher != null) {
083            publisher.registerResource("audit-bulker", "audit-bulker", AuditBulkerMBean.class, this);
084        }
085        registry.register(MetricRegistry.name("nuxeo", "audit", "size"), sizeGauge);
086    }
087
088    @Override
089    public void onShutdown() {
090        registry.remove(MetricRegistry.name("nuxeo", "audit", "size"));
091        ResourcePublisher publisher = Framework.getService(ResourcePublisher.class);
092        if (publisher != null) {
093            publisher.unregisterResource("audit-bulker", "audit-bulker");
094        }
095        stopped = true;
096        try {
097            thread.interrupt();
098        } finally {
099            thread = null;
100        }
101    }
102
103    final AtomicInteger size = new AtomicInteger(0);
104
105    final ReentrantLock lock = new ReentrantLock();
106
107    final Condition isEmpty = lock.newCondition();
108
109    final Condition isFilled = lock.newCondition();
110
111    final Queue<LogEntry> queue = new ConcurrentLinkedQueue<>();
112
113    volatile boolean stopped;
114
115    @Override
116    public void offer(LogEntry entry) {
117        if (log.isDebugEnabled()) {
118            log.debug("offered " + entry);
119        }
120        queue.add(entry);
121        queuedCount.inc();
122
123        if (size.incrementAndGet() >= bulksize) {
124            lock.lock();
125            try {
126                isFilled.signalAll();
127            } finally {
128                lock.unlock();
129            }
130        }
131    }
132
133    @Override
134    public boolean await(long time, TimeUnit unit) throws InterruptedException {
135        if (queue.isEmpty()) {
136            return true;
137        }
138        lock.lock();
139        try {
140            isFilled.signalAll();
141            return isEmpty.await(time, unit);
142        } finally {
143            lock.unlock();
144        }
145    }
146
147    int drain() {
148        List<LogEntry> entries = new LinkedList<>();
149        while (!queue.isEmpty()) {
150            entries.add(queue.remove());
151        }
152        backend.addLogEntries(entries);
153        int delta = entries.size();
154        size.addAndGet(-delta);
155        drainedCount.inc(delta);
156        if (queue.isEmpty()) {
157            lock.lock();
158            try {
159                isEmpty.signalAll();
160            } finally {
161                lock.unlock();
162            }
163        }
164        return delta;
165    }
166
167    class Consumer implements Runnable {
168
169        @Override
170        public void run() {
171            log.info("bulk audit logger started");
172            while (!stopped) {
173                lock.lock();
174                try {
175                    isFilled.await(timeout, TimeUnit.MILLISECONDS);
176                    if (queue.isEmpty()) {
177                        continue;
178                    }
179                } catch (InterruptedException cause) {
180                    Thread.currentThread().interrupt();
181                    return;
182                } finally {
183                    lock.unlock();
184                }
185                try {
186                    int count = drain();
187                    if (log.isDebugEnabled()) {
188                        log.debug("flushed " + count + " events");
189                    }
190                } catch (RuntimeException cause) {
191                    log.error("caught error while draining audit queue", cause);
192                }
193            }
194            log.info("bulk audit logger stopped");
195        }
196
197    }
198
199    @Override
200    public int getBulkTimeout() {
201        return timeout;
202    }
203
204    @Override
205    public void setBulkTimeout(int value) {
206        timeout = value;
207    }
208
209    @Override
210    public int getBulkSize() {
211        return bulksize;
212    }
213
214    @Override
215    public void setBulkSize(int value) {
216        bulksize = value;
217    }
218
219    @Override
220    public void resetMetrics() {
221        queuedCount.dec(queuedCount.getCount());
222        drainedCount.dec(drainedCount.getCount());
223    }
224}