001/*
002 * (C) Copyright 2010, 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 */
019package org.nuxeo.ecm.platform.audit.service;
020
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Queue;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.ReentrantLock;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.ecm.platform.audit.api.LogEntry;
033import org.nuxeo.ecm.platform.audit.service.extension.AuditBulkerDescriptor;
034import org.nuxeo.ecm.platform.audit.service.management.AuditBulkerMBean;
035import org.nuxeo.runtime.api.Framework;
036import org.nuxeo.runtime.management.ResourcePublisher;
037import org.nuxeo.runtime.metrics.MetricsService;
038
039import com.codahale.metrics.Counter;
040import com.codahale.metrics.Gauge;
041import com.codahale.metrics.MetricRegistry;
042import com.codahale.metrics.SharedMetricRegistries;
043
044public class DefaultAuditBulker implements AuditBulkerMBean, AuditBulker {
045
046    final Log log = LogFactory.getLog(DefaultAuditBulker.class);
047
048    final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
049
050    final Gauge<Integer> sizeGauge = new Gauge<Integer>() {
051
052        @Override
053        public Integer getValue() {
054            return queue.size();
055        }
056
057    };
058
059    final AuditBackend backend;
060
061    final Counter queuedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "queued"));
062
063    final Counter drainedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "drained"));
064
065    int timeout;
066
067    int bulksize;
068
069    Thread thread;
070
071    DefaultAuditBulker(AuditBackend backend, AuditBulkerDescriptor config) {
072        this.backend = backend;
073        timeout = config.timeout;
074        bulksize = config.size;
075    }
076
077    @Override
078    public void onApplicationStarted() {
079        thread = new Thread(new Consumer(), "Nuxeo-Audit-Bulker");
080        thread.start();
081        ResourcePublisher publisher = Framework.getService(ResourcePublisher.class);
082        if (publisher != null) {
083            publisher.registerResource("audit-bulker", "audit-bulker", AuditBulkerMBean.class, this);
084        }
085        registry.register(MetricRegistry.name("nuxeo", "audit", "size"), sizeGauge);
086    }
087
088    @Override
089    public void onApplicationStopped() {
090        registry.remove(MetricRegistry.name("nuxeo", "audit", "size"));
091        ResourcePublisher publisher = Framework.getService(ResourcePublisher.class);
092        if (publisher != null) {
093            publisher.unregisterResource("audit-bulker", "audit-bulker");
094        }
095        stopped = true;
096        try {
097            thread.interrupt();
098        } finally {
099            thread = null;
100        }
101    }
102
103    final AtomicInteger size = new AtomicInteger(0);
104
105    final ReentrantLock lock = new ReentrantLock();
106
107    final Condition isEmpty = lock.newCondition();
108
109    final Condition isFilled = lock.newCondition();
110
111    final Queue<LogEntry> queue = new ConcurrentLinkedQueue<>();
112
113    volatile boolean stopped;
114
115    @Override
116    public void offer(LogEntry entry) {
117        if (log.isDebugEnabled()) {
118            log.debug("offered " + entry);
119        }
120        queue.add(entry);
121        queuedCount.inc();
122
123        if (size.incrementAndGet() >= bulksize) {
124            lock.lock();
125            try {
126                isFilled.signalAll();
127            } finally {
128                lock.unlock();
129            }
130        }
131    }
132
133    @Override
134    public boolean await(long time, TimeUnit unit) throws InterruptedException {
135        lock.lock();
136        try {
137            isFilled.signalAll();
138            return isEmpty.await(time, unit);
139        } finally {
140            lock.unlock();
141        }
142    }
143
144    int drain() {
145        List<LogEntry> entries = new LinkedList<>();
146        while (!queue.isEmpty()) {
147            entries.add(queue.remove());
148        }
149        backend.addLogEntries(entries);
150        int delta = entries.size();
151        size.addAndGet(-delta);
152        drainedCount.inc(delta);
153        return delta;
154    }
155
156    class Consumer implements Runnable {
157
158        @Override
159        public void run() {
160            log.info("bulk audit logger started");
161            while (!stopped) {
162                lock.lock();
163                try {
164                    isFilled.await(timeout, TimeUnit.MILLISECONDS);
165                    if (queue.isEmpty()) {
166                        continue;
167                    }
168                    int count = drain();
169                    if (log.isDebugEnabled()) {
170                        log.debug("flushed " + count + " events");
171                    }
172                } catch (InterruptedException cause) {
173                    Thread.currentThread().interrupt();
174                    return;
175                } finally {
176                    isEmpty.signalAll();
177                    lock.unlock();
178                }
179            }
180            log.info("bulk audit logger stopped");
181        }
182
183    }
184
185    @Override
186    public int getBulkTimeout() {
187        return timeout;
188    }
189
190    @Override
191    public void setBulkTimeout(int value) {
192        timeout = value;
193    }
194
195    @Override
196    public int getBulkSize() {
197        return bulksize;
198    }
199
200    @Override
201    public void setBulkSize(int value) {
202        bulksize = value;
203    }
204
205    @Override
206    public void resetMetrics() {
207        queuedCount.dec(queuedCount.getCount());
208        drainedCount.dec(drainedCount.getCount());
209    }
210}