001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.IOException; 022import java.util.List; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.lib.stream.codec.Codec; 027import org.nuxeo.lib.stream.computation.ComputationContext; 028import org.nuxeo.lib.stream.computation.Record; 029import org.nuxeo.lib.stream.log.Latency; 030import org.nuxeo.lib.stream.log.LogManager; 031import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 032 033import com.codahale.metrics.graphite.Graphite; 034import com.codahale.metrics.graphite.GraphiteSender; 035import com.codahale.metrics.graphite.GraphiteUDP; 036 037/** 038 * A computation that sends periodically latencies to graphite. 039 * 040 * @since 10.3 041 */ 042public class LatencyMonitorComputation extends LatencyTrackerComputation { 043 private static final Log log = LogFactory.getLog(LatencyMonitorComputation.class); 044 045 protected final String host; 046 047 protected final int port; 048 049 protected final boolean udp; 050 051 protected final String basePrefix; 052 053 protected GraphiteSender graphite; 054 055 public LatencyMonitorComputation(LogManager manager, List<String> logNames, String host, int port, boolean udp, 056 String basePrefix, String computationName, int intervalSecond, int count, boolean verbose, 057 Codec<Record> codec) { 058 super(manager, logNames, computationName, intervalSecond, count, verbose, codec); 059 this.host = host; 060 this.port = port; 061 this.udp = udp; 062 this.basePrefix = basePrefix; 063 } 064 065 @Override 066 public void init(ComputationContext context) { 067 super.init(context); 068 if (udp) { 069 graphite = new GraphiteUDP(host, port); 070 } else { 071 graphite = new Graphite(host, port); 072 } 073 try { 074 graphite.connect(); 075 } catch (IOException e) { 076 throw new IllegalStateException("Fail to connect to " + host + ":" + port, e); 077 } 078 } 079 080 @Override 081 public void processTimer(ComputationContext context, String key, long timestamp) { 082 if (remaining == 0) { 083 debug("Exiting after " + count + " captures"); 084 context.askForTermination(); 085 return; 086 } 087 debug(String.format("Monitor latency %d/%d", count - remaining, count)); 088 for (LogPartitionGroup logGroup : logGroups) { 089 List<Latency> latencies = getLatenciesForPartition(logGroup, codec); 090 if (latencies.isEmpty()) { 091 continue; 092 } 093 Latency groupLatency = Latency.of(latencies); 094 publishMetrics(groupLatency, String.format("%s%s.%s.all.", basePrefix, logGroup.group, logGroup.name)); 095 for (int partition = 0; partition < latencies.size(); partition++) { 096 publishMetrics(latencies.get(partition), 097 String.format("%s%s.%s.p%02d.", basePrefix, logGroup.group, logGroup.name, partition)); 098 } 099 } 100 context.askForCheckpoint(); 101 context.setTimer("monitor", System.currentTimeMillis() + intervalMs); 102 remaining--; 103 } 104 105 private void publishMetrics(Latency latency, String prefix) { 106 debug(latency.toString()); 107 // upper is the time when the latency has been measured 108 long metricTime = latency.upper() / 1000; 109 try { 110 graphite.send(prefix + "lag", Long.toString(latency.lag().lag()), metricTime); 111 graphite.send(prefix + "end", Long.toString(latency.lag().upper()), metricTime); 112 graphite.send(prefix + "pos", Long.toString(latency.lag().lower()), metricTime); 113 graphite.send(prefix + "latency", Long.toString(latency.latency()), metricTime); 114 } catch (IOException e) { 115 log.error("Fail to send metric to graphite " + prefix + " " + latency, e); 116 } 117 } 118 119 @Override 120 public void destroy() { 121 super.destroy(); 122 if (graphite != null) { 123 try { 124 graphite.close(); 125 } catch (IOException e) { 126 log.debug("Error when closing graphite socket: ", e); 127 } 128 } 129 graphite = null; 130 } 131}