001/*
002 * (C) Copyright 2010, 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 */
019package org.nuxeo.ecm.platform.audit.service;
020
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Queue;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.Condition;
027import java.util.concurrent.locks.ReentrantLock;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.ecm.platform.audit.api.LogEntry;
032import org.nuxeo.ecm.platform.audit.service.extension.AuditBulkerDescriptor;
033import org.nuxeo.ecm.platform.audit.service.management.AuditBulkerMBean;
034import org.nuxeo.runtime.api.Framework;
035import org.nuxeo.runtime.management.ResourcePublisher;
036import org.nuxeo.runtime.metrics.MetricsService;
037
038import com.codahale.metrics.Counter;
039import com.codahale.metrics.Gauge;
040import com.codahale.metrics.MetricRegistry;
041import com.codahale.metrics.SharedMetricRegistries;
042
043public class DefaultAuditBulker implements AuditBulkerMBean, AuditBulker {
044
045    final Log log = LogFactory.getLog(DefaultAuditBulker.class);
046
047    final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
048
049    final Gauge<Integer> sizeGauge = new Gauge<Integer>() {
050
051        @Override
052        public Integer getValue() {
053            return queue.size();
054        }
055
056    };
057
058    final AuditBackend backend;
059
060    final Counter queuedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "queued"));
061
062    final Counter drainedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "drained"));
063
064    int timeout;
065
066    int size;
067
068    Thread thread;
069
070    DefaultAuditBulker(AuditBackend backend, AuditBulkerDescriptor config) {
071        this.backend = backend;
072        timeout = config.timeout;
073        size = config.size;
074    }
075
076    @Override
077    public void onApplicationStarted() {
078        thread = new Thread(new Consumer(), "Nuxeo-Audit-Bulker");
079        thread.start();
080        ResourcePublisher publisher = Framework.getService(ResourcePublisher.class);
081        if (publisher != null) {
082            publisher.registerResource("audit-bulker", "audit-bulker", AuditBulkerMBean.class, this);
083        }
084        registry.register(MetricRegistry.name("nuxeo", "audit", "size"), sizeGauge);
085    }
086
087    @Override
088    public void onShutdown() {
089        registry.remove(MetricRegistry.name("nuxeo", "audit", "size"));
090        ResourcePublisher publisher = Framework.getService(ResourcePublisher.class);
091        if (publisher != null) {
092            publisher.unregisterResource("audit-bulker", "audit-bulker");
093        }
094        stopped = true;
095        try {
096            thread.interrupt();
097        } finally {
098            thread = null;
099        }
100    }
101
102    final ReentrantLock lock = new ReentrantLock();
103
104    final Condition isEmpty = lock.newCondition();
105
106    final Condition isFilled = lock.newCondition();
107
108    final Queue<LogEntry> queue = new ConcurrentLinkedQueue<>();
109
110    volatile boolean stopped;
111
112    @Override
113    public void offer(LogEntry entry) {
114        if (log.isDebugEnabled()) {
115            log.debug("offered " + entry);
116        }
117        queue.add(entry);
118        queuedCount.inc();
119        if (queue.size() >= size) {
120            lock.lock();
121            try {
122                isFilled.signalAll();
123            } finally {
124                lock.unlock();
125            }
126        }
127    }
128
129    @Override
130    public boolean await(long time, TimeUnit unit) throws InterruptedException {
131        if (queue.isEmpty()) {
132            return true;
133        }
134        lock.lock();
135        try {
136            isFilled.signalAll();
137            return isEmpty.await(time, unit);
138        } finally {
139            lock.unlock();
140        }
141    }
142
143    int drain() {
144        List<LogEntry> entries = new LinkedList<>();
145        while (!queue.isEmpty()) {
146            entries.add(queue.remove());
147        }
148        backend.addLogEntries(entries);
149        drainedCount.inc(entries.size());
150        if (queue.isEmpty()) {
151            lock.lock();
152            try {
153                isEmpty.signalAll();
154            } finally {
155                lock.unlock();
156            }
157        }
158        return entries.size();
159    }
160
161    class Consumer implements Runnable {
162
163        @Override
164        public void run() {
165            log.info("bulk audit logger started");
166            while (!stopped) {
167                lock.lock();
168                try {
169                    isFilled.await(timeout, TimeUnit.MILLISECONDS);
170                    if (queue.isEmpty()) {
171                        continue;
172                    }
173                } catch (InterruptedException cause) {
174                    Thread.currentThread().interrupt();
175                    return;
176                } finally {
177                    lock.unlock();
178                }
179                try {
180                    int count = drain();
181                    if (log.isDebugEnabled()) {
182                        log.debug("flushed " + count + " events");
183                    }
184                } catch (RuntimeException cause) {
185                    log.error("caught error while draining audit queue", cause);
186                }
187            }
188            log.info("bulk audit logger stopped");
189        }
190
191    }
192
193    @Override
194    public int getBulkTimeout() {
195        return timeout;
196    }
197
198    @Override
199    public void setBulkTimeout(int value) {
200        timeout = value;
201    }
202
203    @Override
204    public int getBulkSize() {
205        return size;
206    }
207
208    @Override
209    public void setBulkSize(int value) {
210        size = value;
211    }
212
213    @Override
214    public void resetMetrics() {
215        queuedCount.dec(queuedCount.getCount());
216        drainedCount.dec(drainedCount.getCount());
217    }
218}