001/* 002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.IOException; 022import java.net.InetAddress; 023import java.net.UnknownHostException; 024import java.util.ArrayList; 025import java.util.List; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.coursera.metrics.datadog.model.DatadogGauge; 030import org.coursera.metrics.datadog.transport.HttpTransport; 031import org.coursera.metrics.datadog.transport.Transport; 032import org.nuxeo.lib.stream.codec.Codec; 033import org.nuxeo.lib.stream.computation.ComputationContext; 034import org.nuxeo.lib.stream.computation.Record; 035import org.nuxeo.lib.stream.log.Latency; 036import org.nuxeo.lib.stream.log.LogManager; 037import org.nuxeo.lib.stream.log.Name; 038import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 039 040/** 041 * A computation that sends periodically latencies to Datadog. 042 * 043 * @since 11.1 044 */ 045public class LatencyDatadogComputation extends LatencyTrackerComputation { 046 047 private static final Log log = LogFactory.getLog(LatencyDatadogComputation.class); 048 049 protected static final String HOSTNAME_UNKNOWN = "unknown"; 050 051 protected final String apiKey; 052 053 protected final List<String> tags; 054 055 protected final String basePrefix; 056 057 protected final boolean partition; 058 059 protected final String hostname; 060 061 protected HttpTransport transport; 062 063 public LatencyDatadogComputation(LogManager manager, List<Name> logNames, String apiKey, List<String> tags, 064 String basePrefix, String computationName, int intervalSecond, int count, boolean partition, 065 boolean verbose, Codec<Record> codec) { 066 super(manager, logNames, computationName, intervalSecond, count, verbose, codec, 0); 067 this.apiKey = apiKey; 068 this.tags = tags; 069 this.basePrefix = basePrefix; 070 this.partition = partition; 071 hostname = getHostName(); 072 } 073 074 protected String getHostName() { 075 try { 076 return InetAddress.getLocalHost().getHostName().split("\\.")[0]; 077 } catch (UnknownHostException e) { 078 return HOSTNAME_UNKNOWN; 079 } 080 } 081 082 @Override 083 public void init(ComputationContext context) { 084 super.init(context); 085 transport = new HttpTransport.Builder().withApiKey(apiKey).build(); 086 } 087 088 @Override 089 protected void processLatencies(ComputationContext context, LogPartitionGroup logGroup, List<Latency> latencies) { 090 Latency groupLatency = Latency.of(latencies); 091 publishMetrics(groupLatency, basePrefix, "all", logGroup.name, logGroup.group); 092 if (!partition) { 093 return; 094 } 095 for (int part = 0; part < latencies.size(); part++) { 096 publishMetrics(groupLatency, basePrefix, String.format("%02d", part), logGroup.name, logGroup.group); 097 } 098 } 099 100 protected void publishMetrics(Latency latency, String prefix, String partition, Name stream, Name group) { 101 if (verbose) { 102 log.info(latency.toString()); 103 } 104 // upper is the time when the latency has been measured 105 long metricTime = latency.upper() / 1000; 106 List<String> mTags = new ArrayList<>(tags.size() + 3); 107 mTags.addAll(tags); 108 mTags.add("stream:" + stream); 109 mTags.add("consumer:" + group); 110 mTags.add("partition:" + partition); 111 try { 112 Transport.Request request = transport.prepare(); 113 request.addGauge(new DatadogGauge(prefix + ".lag", latency.lag().lag(), metricTime, hostname, mTags)); 114 request.addGauge(new DatadogGauge(prefix + ".end", latency.lag().upper(), metricTime, hostname, mTags)); 115 request.addGauge(new DatadogGauge(prefix + ".pos", latency.lag().lower(), metricTime, hostname, mTags)); 116 request.addGauge(new DatadogGauge(prefix + ".latency", latency.latency(), metricTime, hostname, mTags)); 117 request.send(); 118 } catch (IOException e) { 119 log.error("Fail to prepare metric to datadog " + prefix + " " + latency, e); 120 } catch (Exception e) { 121 log.error("Fail to send metric to datadog " + prefix + " " + latency, e); 122 } 123 } 124 125 @Override 126 public void destroy() { 127 super.destroy(); 128 if (transport != null) { 129 try { 130 transport.close(); 131 } catch (IOException e) { 132 log.debug("Error when closing Datadog client: ", e); 133 } 134 transport = null; 135 } 136 } 137}