001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.IOException;
022import java.util.List;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.lib.stream.codec.Codec;
027import org.nuxeo.lib.stream.computation.ComputationContext;
028import org.nuxeo.lib.stream.computation.Record;
029import org.nuxeo.lib.stream.log.Latency;
030import org.nuxeo.lib.stream.log.LogManager;
031import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
032
033import com.codahale.metrics.graphite.Graphite;
034import com.codahale.metrics.graphite.GraphiteSender;
035import com.codahale.metrics.graphite.GraphiteUDP;
036
037/**
038 * A computation that sends periodically latencies to graphite.
039 *
040 * @since 10.3
041 */
042public class LatencyMonitorComputation extends LatencyTrackerComputation {
043    private static final Log log = LogFactory.getLog(LatencyMonitorComputation.class);
044
045    protected final String host;
046
047    protected final int port;
048
049    protected final boolean udp;
050
051    protected final String basePrefix;
052
053    protected GraphiteSender graphite;
054
055    public LatencyMonitorComputation(LogManager manager, List<String> logNames, String host, int port, boolean udp,
056            String basePrefix, String computationName, int intervalSecond, int count, boolean verbose,
057            Codec<Record> codec) {
058        super(manager, logNames, computationName, intervalSecond, count, verbose, codec);
059        this.host = host;
060        this.port = port;
061        this.udp = udp;
062        this.basePrefix = basePrefix;
063    }
064
065    @Override
066    public void init(ComputationContext context) {
067        super.init(context);
068        if (udp) {
069            graphite = new GraphiteUDP(host, port);
070        } else {
071            graphite = new Graphite(host, port);
072        }
073        try {
074            graphite.connect();
075        } catch (IOException e) {
076            throw new IllegalStateException("Fail to connect to " + host + ":" + port, e);
077        }
078    }
079
080    @Override
081    public void processTimer(ComputationContext context, String key, long timestamp) {
082        if (remaining == 0) {
083            debug("Exiting after " + count + " captures");
084            context.askForTermination();
085            return;
086        }
087        debug(String.format("Monitor latency %d/%d", count - remaining, count));
088        for (LogPartitionGroup logGroup : logGroups) {
089            List<Latency> latencies = getLatenciesForPartition(logGroup, codec);
090            if (latencies.isEmpty()) {
091                continue;
092            }
093            Latency groupLatency = Latency.of(latencies);
094            publishMetrics(groupLatency, String.format("%s%s.%s.all.", basePrefix, logGroup.group, logGroup.name));
095            for (int partition = 0; partition < latencies.size(); partition++) {
096                publishMetrics(latencies.get(partition),
097                        String.format("%s%s.%s.p%02d.", basePrefix, logGroup.group, logGroup.name, partition));
098            }
099        }
100        context.askForCheckpoint();
101        context.setTimer("monitor", System.currentTimeMillis() + intervalMs);
102        remaining--;
103    }
104
105    private void publishMetrics(Latency latency, String prefix) {
106        debug(latency.toString());
107        // upper is the time when the latency has been measured
108        long metricTime = latency.upper() / 1000;
109        try {
110            graphite.send(prefix + "lag", Long.toString(latency.lag().lag()), metricTime);
111            graphite.send(prefix + "end", Long.toString(latency.lag().upper()), metricTime);
112            graphite.send(prefix + "pos", Long.toString(latency.lag().lower()), metricTime);
113            graphite.send(prefix + "latency", Long.toString(latency.latency()), metricTime);
114        } catch (IOException e) {
115            log.error("Fail to send metric to graphite " + prefix + " " + latency, e);
116        }
117    }
118
119    @Override
120    public void destroy() {
121        super.destroy();
122        if (graphite != null) {
123            try {
124                graphite.close();
125            } catch (IOException e) {
126                log.debug("Error when closing graphite socket: ", e);
127            }
128        }
129        graphite = null;
130    }
131}