001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.IOException; 022import java.util.List; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.lib.stream.codec.Codec; 027import org.nuxeo.lib.stream.computation.ComputationContext; 028import org.nuxeo.lib.stream.computation.Record; 029import org.nuxeo.lib.stream.log.Latency; 030import org.nuxeo.lib.stream.log.LogManager; 031import org.nuxeo.lib.stream.log.Name; 032import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 033 034import io.dropwizard.metrics5.graphite.Graphite; 035import io.dropwizard.metrics5.graphite.GraphiteSender; 036import io.dropwizard.metrics5.graphite.GraphiteUDP; 037 038/** 039 * A computation that sends periodically latencies to graphite. 040 * 041 * @since 10.3 042 */ 043public class LatencyMonitorComputation extends LatencyTrackerComputation { 044 045 private static final Log log = LogFactory.getLog(LatencyMonitorComputation.class); 046 047 protected final String host; 048 049 protected final int port; 050 051 protected final boolean udp; 052 053 protected final String basePrefix; 054 055 protected GraphiteSender graphite; 056 057 protected final boolean partition; 058 059 @Deprecated 060 public LatencyMonitorComputation(LogManager manager, List<Name> logNames, String host, int port, boolean udp, 061 String basePrefix, String computationName, int intervalSecond, int count, boolean verbose, 062 Codec<Record> codec) { 063 this(manager, logNames, host, port, udp, basePrefix, computationName, intervalSecond, count, true, verbose, 064 codec); 065 } 066 067 public LatencyMonitorComputation(LogManager manager, List<Name> logNames, String host, int port, boolean udp, 068 String basePrefix, String computationName, int intervalSecond, int count, boolean partition, 069 boolean verbose, Codec<Record> codec) { 070 super(manager, logNames, computationName, intervalSecond, count, verbose, codec, 0); 071 this.host = host; 072 this.port = port; 073 this.udp = udp; 074 this.partition = partition; 075 this.basePrefix = basePrefix; 076 } 077 078 @Override 079 public void init(ComputationContext context) { 080 super.init(context); 081 if (udp) { 082 graphite = new GraphiteUDP(host, port); 083 } else { 084 graphite = new Graphite(host, port); 085 } 086 try { 087 graphite.connect(); 088 } catch (IOException e) { 089 throw new IllegalStateException("Fail to connect to " + host + ":" + port, e); 090 } 091 } 092 093 @Override 094 protected void processLatencies(ComputationContext context, LogPartitionGroup logGroup, List<Latency> latencies) { 095 Latency groupLatency = Latency.of(latencies); 096 publishMetrics(groupLatency, String.format("%s%s.%s.all.", basePrefix, logGroup.group, logGroup.name)); 097 if (!partition) { 098 return; 099 } 100 for (int part = 0; part < latencies.size(); part++) { 101 publishMetrics(latencies.get(part), 102 String.format("%s%s.%s.p%02d.", basePrefix, logGroup.group, logGroup.name, part)); 103 } 104 } 105 106 protected void publishMetrics(Latency latency, String prefix) { 107 if (verbose) { 108 log.info(latency.toString()); 109 } 110 // upper is the time when the latency has been measured 111 long metricTime = latency.upper() / 1000; 112 try { 113 graphite.send(prefix + "lag", Long.toString(latency.lag().lag()), metricTime); 114 graphite.send(prefix + "end", Long.toString(latency.lag().upper()), metricTime); 115 graphite.send(prefix + "pos", Long.toString(latency.lag().lower()), metricTime); 116 graphite.send(prefix + "latency", Long.toString(latency.latency()), metricTime); 117 } catch (IOException e) { 118 log.error("Fail to send metric to graphite " + prefix + " " + latency, e); 119 } 120 } 121 122 @Override 123 public void destroy() { 124 super.destroy(); 125 if (graphite != null) { 126 try { 127 graphite.close(); 128 } catch (IOException e) { 129 log.debug("Error when closing graphite socket: ", e); 130 } 131 graphite = null; 132 } 133 } 134}