001/*
002 * (C) Copyright 2010, 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 */
019package org.nuxeo.ecm.platform.audit.service;
020
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Queue;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.ReentrantLock;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.ecm.platform.audit.api.LogEntry;
033import org.nuxeo.ecm.platform.audit.service.extension.AuditBulkerDescriptor;
034import org.nuxeo.ecm.platform.audit.service.management.AuditBulkerMBean;
035import org.nuxeo.runtime.api.Framework;
036import org.nuxeo.runtime.management.ResourcePublisher;
037import org.nuxeo.runtime.metrics.MetricsService;
038
039import io.dropwizard.metrics5.Counter;
040import io.dropwizard.metrics5.Gauge;
041import io.dropwizard.metrics5.MetricRegistry;
042import io.dropwizard.metrics5.SharedMetricRegistries;
043
044/**
045 * @deprecated since 10.10, audit bulker is now handled with nuxeo-stream, no replacement
046 */
047@Deprecated
048public class DefaultAuditBulker implements AuditBulkerMBean, AuditBulker {
049
050    final Log log = LogFactory.getLog(DefaultAuditBulker.class);
051
052    final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
053
054    final Gauge<Integer> sizeGauge = new Gauge<Integer>() {
055
056        @Override
057        public Integer getValue() {
058            return queue.size();
059        }
060
061    };
062
063    final AuditBackend backend;
064
065    final Counter queuedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "queued"));
066
067    final Counter drainedCount = registry.counter(MetricRegistry.name("nuxeo", "audit", "drained"));
068
069    int timeout;
070
071    int bulksize;
072
073    Thread thread;
074
075    DefaultAuditBulker(AuditBackend backend, AuditBulkerDescriptor config) {
076        this.backend = backend;
077        timeout = config.timeout;
078        bulksize = config.size;
079    }
080
081    @Override
082    public void onApplicationStarted() {
083        thread = new Thread(new Consumer(), "Nuxeo-Audit-Bulker");
084        thread.start();
085        ResourcePublisher publisher = Framework.getService(ResourcePublisher.class);
086        if (publisher != null) {
087            publisher.registerResource("audit-bulker", "audit-bulker", AuditBulkerMBean.class, this);
088        }
089        registry.register(MetricRegistry.name("nuxeo", "audit", "size"), sizeGauge);
090    }
091
092    @Override
093    public void onApplicationStopped() {
094        registry.remove(MetricRegistry.name("nuxeo", "audit", "size"));
095        ResourcePublisher publisher = Framework.getService(ResourcePublisher.class);
096        if (publisher != null) {
097            publisher.unregisterResource("audit-bulker", "audit-bulker");
098        }
099        stopped = true;
100        try {
101            thread.interrupt();
102        } finally {
103            thread = null;
104        }
105    }
106
107    final AtomicInteger size = new AtomicInteger(0);
108
109    final ReentrantLock lock = new ReentrantLock();
110
111    final Condition isEmpty = lock.newCondition();
112
113    final Condition isFilled = lock.newCondition();
114
115    final Queue<LogEntry> queue = new ConcurrentLinkedQueue<>();
116
117    volatile boolean stopped;
118
119    @Override
120    public void offer(LogEntry entry) {
121        if (log.isDebugEnabled()) {
122            log.debug("offered " + entry);
123        }
124        queue.add(entry);
125        queuedCount.inc();
126
127        if (size.incrementAndGet() >= bulksize) {
128            lock.lock();
129            try {
130                isFilled.signalAll();
131            } finally {
132                lock.unlock();
133            }
134        }
135    }
136
137    @Override
138    public boolean await(long time, TimeUnit unit) throws InterruptedException {
139        lock.lock();
140        try {
141            isFilled.signalAll();
142            long nanos = unit.toNanos(time);
143            while (!queue.isEmpty()) {
144                if (nanos <= 0) {
145                    return false;
146                }
147                nanos = isEmpty.awaitNanos(nanos);
148            }
149            return true;
150        } finally {
151            lock.unlock();
152        }
153    }
154
155    int drain() {
156        List<LogEntry> entries = new LinkedList<>();
157        while (!queue.isEmpty()) {
158            entries.add(queue.remove());
159        }
160        backend.addLogEntries(entries);
161        int delta = entries.size();
162        size.addAndGet(-delta);
163        drainedCount.inc(delta);
164        return delta;
165    }
166
167    class Consumer implements Runnable {
168
169        @Override
170        public void run() {
171            log.info("bulk audit logger started");
172            while (!stopped) {
173                lock.lock();
174                try {
175                    isFilled.await(timeout, TimeUnit.MILLISECONDS); // NOSONAR (spurious wakeups don't matter)
176                    if (queue.isEmpty()) {
177                        continue;
178                    }
179                    int count = drain();
180                    if (log.isDebugEnabled()) {
181                        log.debug("flushed " + count + " events");
182                    }
183                } catch (InterruptedException cause) {
184                    Thread.currentThread().interrupt();
185                    return;
186                } finally {
187                    try {
188                        isEmpty.signalAll();
189                    } finally {
190                        lock.unlock();
191                    }
192                }
193            }
194            log.info("bulk audit logger stopped");
195        }
196
197    }
198
199    @Override
200    public int getBulkTimeout() {
201        return timeout;
202    }
203
204    @Override
205    public void setBulkTimeout(int value) {
206        timeout = value;
207    }
208
209    @Override
210    public int getBulkSize() {
211        return bulksize;
212    }
213
214    @Override
215    public void setBulkSize(int value) {
216        bulksize = value;
217    }
218
219    @Override
220    public void resetMetrics() {
221        queuedCount.dec(queuedCount.getCount());
222        drainedCount.dec(drainedCount.getCount());
223    }
224}