001/* 002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.runtime.stream; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022import static org.nuxeo.lib.stream.computation.log.LogStreamManager.METRICS_STREAM; 023 024import java.math.BigDecimal; 025import java.math.BigInteger; 026import java.net.InetAddress; 027import java.net.UnknownHostException; 028import java.util.Map; 029import java.util.SortedMap; 030import java.util.concurrent.TimeUnit; 031 032import org.nuxeo.lib.stream.StreamRuntimeException; 033import org.nuxeo.lib.stream.computation.Record; 034import org.nuxeo.runtime.api.Framework; 035import org.nuxeo.runtime.cluster.ClusterService; 036 037import com.fasterxml.jackson.core.JsonProcessingException; 038import com.fasterxml.jackson.databind.ObjectMapper; 039import com.fasterxml.jackson.databind.node.ArrayNode; 040import com.fasterxml.jackson.databind.node.ObjectNode; 041 042import io.dropwizard.metrics5.Counter; 043import io.dropwizard.metrics5.Gauge; 044import io.dropwizard.metrics5.Histogram; 045import io.dropwizard.metrics5.Meter; 046import io.dropwizard.metrics5.MetricFilter; 047import io.dropwizard.metrics5.MetricName; 048import io.dropwizard.metrics5.MetricRegistry; 049import io.dropwizard.metrics5.ScheduledReporter; 050import io.dropwizard.metrics5.Snapshot; 051import io.dropwizard.metrics5.Timer; 052 053/** 054 * A Dropwizard Metrics Reporter that sends metrics into a Nuxeo Stream. 055 * 056 * @since 11.5 057 */ 058public class StreamMetricsReporter extends ScheduledReporter { 059 060 protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); 061 062 protected String hostIp; 063 064 protected String hostname; 065 066 protected String nodeId; 067 068 public StreamMetricsReporter(MetricRegistry registry, MetricFilter filter) { 069 super(registry, "stream-reporter", filter, TimeUnit.SECONDS, TimeUnit.SECONDS); 070 try { 071 InetAddress host = InetAddress.getLocalHost(); 072 hostIp = host.getHostAddress(); 073 hostname = host.getHostName(); 074 } catch (UnknownHostException e) { 075 hostIp = "unknown"; 076 hostname = "unknown"; 077 } 078 } 079 080 protected String getNodeId() { 081 if (nodeId == null) { 082 ClusterService clusterService = Framework.getService(ClusterService.class); 083 if (clusterService.isEnabled()) { 084 nodeId = clusterService.getNodeId(); 085 } 086 } 087 return nodeId; 088 } 089 090 @Override 091 public void report(SortedMap<MetricName, Gauge> gauges, SortedMap<MetricName, Counter> counters, 092 SortedMap<MetricName, Histogram> histograms, SortedMap<MetricName, Meter> meters, 093 SortedMap<MetricName, Timer> timers) { 094 StreamService service = Framework.getService(StreamService.class); 095 if (service == null) { 096 // stream service is not yet ready 097 return; 098 } 099 // like for other reporters, there is no need for millisecond granularity 100 long timestamp = System.currentTimeMillis() / 1000; 101 ArrayNode metrics = OBJECT_MAPPER.createArrayNode(); 102 for (Map.Entry<MetricName, Gauge> entry : gauges.entrySet()) { 103 reportGauge(metrics, entry.getKey(), entry.getValue()); 104 } 105 for (Map.Entry<MetricName, Timer> entry : timers.entrySet()) { 106 reportTimer(metrics, entry.getKey(), entry.getValue()); 107 } 108 for (Map.Entry<MetricName, Counter> entry : counters.entrySet()) { 109 reportCounter(metrics, entry.getKey(), entry.getValue()); 110 } 111 ObjectNode ret = OBJECT_MAPPER.createObjectNode(); 112 ret.put("timestamp", timestamp); 113 ret.put("hostname", hostname); 114 ret.put("ip", hostIp); 115 ret.put("nodeId", getNodeId()); 116 ret.set("metrics", metrics); 117 try { 118 service.getStreamManager() 119 .append(METRICS_STREAM, 120 Record.of(hostIp, OBJECT_MAPPER.writer().writeValueAsString(ret).getBytes(UTF_8))); 121 122 } catch (JsonProcessingException e) { 123 throw new StreamRuntimeException("Cannot convert to json", e); 124 } 125 } 126 127 protected void reportTimer(ArrayNode metrics, MetricName key, Timer value) { 128 ObjectNode metric = OBJECT_MAPPER.createObjectNode(); 129 metric.put("k", key.getKey()); 130 key.getTags().forEach(metric::put); 131 if (value.getCount() == 0) { 132 // don't report empty timer 133 metric.put("count", 0); 134 } else { 135 metric.put("count", value.getCount()); 136 metric.put("rate1m", value.getOneMinuteRate()); 137 metric.put("rate5m", value.getFiveMinuteRate()); 138 metric.put("sum", value.getSum()); 139 Snapshot snapshot = value.getSnapshot(); 140 metric.put("max", convertDuration(snapshot.getMax())); 141 metric.put("mean", convertDuration(snapshot.getMean())); 142 metric.put("min", convertDuration(snapshot.getMin())); 143 metric.put("stddev", convertDuration(snapshot.getStdDev())); 144 metric.put("p50", convertDuration(snapshot.getMedian())); 145 metric.put("p95", convertDuration(snapshot.get95thPercentile())); 146 metric.put("p99", convertDuration(snapshot.get99thPercentile())); 147 } 148 metrics.add(metric); 149 } 150 151 protected void reportCounter(ArrayNode metrics, MetricName key, Counter value) { 152 ObjectNode metric = OBJECT_MAPPER.createObjectNode(); 153 metric.put("k", key.getKey()); 154 key.getTags().forEach(metric::put); 155 metric.put("v", value.getCount()); 156 metrics.add(metric); 157 } 158 159 protected void reportGauge(ArrayNode metrics, MetricName key, Gauge<?> value) { 160 ObjectNode metric = OBJECT_MAPPER.createObjectNode(); 161 metric.put("k", key.getKey()); 162 key.getTags().forEach(metric::put); 163 putGaugeMetric(metric, value.getValue()); 164 metrics.add(metric); 165 } 166 167 protected void putGaugeMetric(ObjectNode metric, Object o) { 168 if (o instanceof Float) { 169 metric.put("v", (Float) o); 170 } else if (o instanceof Double) { 171 metric.put("v", (Double) o); 172 } else if (o instanceof Byte) { 173 metric.put("v", ((Byte) o).intValue()); 174 } else if (o instanceof Short) { 175 metric.put("v", ((Short) o)); 176 } else if (o instanceof Integer) { 177 metric.put("v", ((Integer) o)); 178 } else if (o instanceof Long) { 179 metric.put("v", ((Long) o)); 180 } else if (o instanceof BigInteger) { 181 metric.put("v", ((BigInteger) o)); 182 } else if (o instanceof BigDecimal) { 183 metric.put("v", ((BigDecimal) o)); 184 } else if (o instanceof Boolean) { 185 metric.put("v", (Boolean) o ? 1 : 0); 186 } 187 } 188}