001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.stream;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static org.nuxeo.lib.stream.computation.log.LogStreamManager.METRICS_STREAM;
023
024import java.math.BigDecimal;
025import java.math.BigInteger;
026import java.net.InetAddress;
027import java.net.UnknownHostException;
028import java.util.Map;
029import java.util.SortedMap;
030import java.util.concurrent.TimeUnit;
031
032import org.nuxeo.lib.stream.StreamRuntimeException;
033import org.nuxeo.lib.stream.computation.Record;
034import org.nuxeo.runtime.api.Framework;
035import org.nuxeo.runtime.cluster.ClusterService;
036
037import com.fasterxml.jackson.core.JsonProcessingException;
038import com.fasterxml.jackson.databind.ObjectMapper;
039import com.fasterxml.jackson.databind.node.ArrayNode;
040import com.fasterxml.jackson.databind.node.ObjectNode;
041
042import io.dropwizard.metrics5.Counter;
043import io.dropwizard.metrics5.Gauge;
044import io.dropwizard.metrics5.Histogram;
045import io.dropwizard.metrics5.Meter;
046import io.dropwizard.metrics5.MetricFilter;
047import io.dropwizard.metrics5.MetricName;
048import io.dropwizard.metrics5.MetricRegistry;
049import io.dropwizard.metrics5.ScheduledReporter;
050import io.dropwizard.metrics5.Snapshot;
051import io.dropwizard.metrics5.Timer;
052
053/**
054 * A Dropwizard Metrics Reporter that sends metrics into a Nuxeo Stream.
055 *
056 * @since 11.5
057 */
058public class StreamMetricsReporter extends ScheduledReporter {
059
060    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
061
062    protected String hostIp;
063
064    protected String hostname;
065
066    protected String nodeId;
067
068    public StreamMetricsReporter(MetricRegistry registry, MetricFilter filter) {
069        super(registry, "stream-reporter", filter, TimeUnit.SECONDS, TimeUnit.SECONDS);
070        try {
071            InetAddress host = InetAddress.getLocalHost();
072            hostIp = host.getHostAddress();
073            hostname = host.getHostName();
074        } catch (UnknownHostException e) {
075            hostIp = "unknown";
076            hostname = "unknown";
077        }
078    }
079
080    protected String getNodeId() {
081        if (nodeId == null) {
082            ClusterService clusterService = Framework.getService(ClusterService.class);
083            if (clusterService.isEnabled()) {
084                nodeId = clusterService.getNodeId();
085            }
086        }
087        return nodeId;
088    }
089
090    @Override
091    public void report(SortedMap<MetricName, Gauge> gauges, SortedMap<MetricName, Counter> counters,
092            SortedMap<MetricName, Histogram> histograms, SortedMap<MetricName, Meter> meters,
093            SortedMap<MetricName, Timer> timers) {
094        StreamService service = Framework.getService(StreamService.class);
095        if (service == null) {
096            // stream service is not yet ready
097            return;
098        }
099        // like for other reporters, there is no need for millisecond granularity
100        long timestamp = System.currentTimeMillis() / 1000;
101        ArrayNode metrics = OBJECT_MAPPER.createArrayNode();
102        for (Map.Entry<MetricName, Gauge> entry : gauges.entrySet()) {
103            reportGauge(metrics, entry.getKey(), entry.getValue());
104        }
105        for (Map.Entry<MetricName, Timer> entry : timers.entrySet()) {
106            reportTimer(metrics, entry.getKey(), entry.getValue());
107        }
108        for (Map.Entry<MetricName, Counter> entry : counters.entrySet()) {
109            reportCounter(metrics, entry.getKey(), entry.getValue());
110        }
111        ObjectNode ret = OBJECT_MAPPER.createObjectNode();
112        ret.put("timestamp", timestamp);
113        ret.put("hostname", hostname);
114        ret.put("ip", hostIp);
115        ret.put("nodeId", getNodeId());
116        ret.set("metrics", metrics);
117        try {
118            service.getStreamManager()
119                   .append(METRICS_STREAM,
120                           Record.of(hostIp, OBJECT_MAPPER.writer().writeValueAsString(ret).getBytes(UTF_8)));
121
122        } catch (JsonProcessingException e) {
123            throw new StreamRuntimeException("Cannot convert to json", e);
124        }
125    }
126
127    protected void reportTimer(ArrayNode metrics, MetricName key, Timer value) {
128        ObjectNode metric = OBJECT_MAPPER.createObjectNode();
129        metric.put("k", key.getKey());
130        key.getTags().forEach(metric::put);
131        if (value.getCount() == 0) {
132            // don't report empty timer
133            metric.put("count", 0);
134        } else {
135            metric.put("count", value.getCount());
136            metric.put("rate1m", value.getOneMinuteRate());
137            metric.put("rate5m", value.getFiveMinuteRate());
138            metric.put("sum", value.getSum());
139            Snapshot snapshot = value.getSnapshot();
140            metric.put("max", convertDuration(snapshot.getMax()));
141            metric.put("mean", convertDuration(snapshot.getMean()));
142            metric.put("min", convertDuration(snapshot.getMin()));
143            metric.put("stddev", convertDuration(snapshot.getStdDev()));
144            metric.put("p50", convertDuration(snapshot.getMedian()));
145            metric.put("p95", convertDuration(snapshot.get95thPercentile()));
146            metric.put("p99", convertDuration(snapshot.get99thPercentile()));
147        }
148        metrics.add(metric);
149    }
150
151    protected void reportCounter(ArrayNode metrics, MetricName key, Counter value) {
152        ObjectNode metric = OBJECT_MAPPER.createObjectNode();
153        metric.put("k", key.getKey());
154        key.getTags().forEach(metric::put);
155        metric.put("v", value.getCount());
156        metrics.add(metric);
157    }
158
159    protected void reportGauge(ArrayNode metrics, MetricName key, Gauge<?> value) {
160        ObjectNode metric = OBJECT_MAPPER.createObjectNode();
161        metric.put("k", key.getKey());
162        key.getTags().forEach(metric::put);
163        putGaugeMetric(metric, value.getValue());
164        metrics.add(metric);
165    }
166
167    protected void putGaugeMetric(ObjectNode metric, Object o) {
168        if (o instanceof Float) {
169            metric.put("v", (Float) o);
170        } else if (o instanceof Double) {
171            metric.put("v", (Double) o);
172        } else if (o instanceof Byte) {
173            metric.put("v", ((Byte) o).intValue());
174        } else if (o instanceof Short) {
175            metric.put("v", ((Short) o));
176        } else if (o instanceof Integer) {
177            metric.put("v", ((Integer) o));
178        } else if (o instanceof Long) {
179            metric.put("v", ((Long) o));
180        } else if (o instanceof BigInteger) {
181            metric.put("v", ((BigInteger) o));
182        } else if (o instanceof BigDecimal) {
183            metric.put("v", ((BigDecimal) o));
184        } else if (o instanceof Boolean) {
185            metric.put("v", (Boolean) o ? 1 : 0);
186        }
187    }
188}