001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.IOException;
022import java.util.List;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.lib.stream.codec.Codec;
027import org.nuxeo.lib.stream.computation.ComputationContext;
028import org.nuxeo.lib.stream.computation.Record;
029import org.nuxeo.lib.stream.log.Latency;
030import org.nuxeo.lib.stream.log.LogManager;
031import org.nuxeo.lib.stream.log.Name;
032import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
033
034import io.dropwizard.metrics5.graphite.Graphite;
035import io.dropwizard.metrics5.graphite.GraphiteSender;
036import io.dropwizard.metrics5.graphite.GraphiteUDP;
037
038/**
039 * A computation that sends periodically latencies to graphite.
040 *
041 * @since 10.3
042 */
043public class LatencyMonitorComputation extends LatencyTrackerComputation {
044
045    private static final Log log = LogFactory.getLog(LatencyMonitorComputation.class);
046
047    protected final String host;
048
049    protected final int port;
050
051    protected final boolean udp;
052
053    protected final String basePrefix;
054
055    protected GraphiteSender graphite;
056
057    protected final boolean partition;
058
059    @Deprecated
060    public LatencyMonitorComputation(LogManager manager, List<Name> logNames, String host, int port, boolean udp,
061            String basePrefix, String computationName, int intervalSecond, int count, boolean verbose,
062            Codec<Record> codec) {
063        this(manager, logNames, host, port, udp, basePrefix, computationName, intervalSecond, count, true, verbose,
064                codec);
065    }
066
067    public LatencyMonitorComputation(LogManager manager, List<Name> logNames, String host, int port, boolean udp,
068                                     String basePrefix, String computationName, int intervalSecond, int count, boolean partition,
069                                     boolean verbose, Codec<Record> codec) {
070        super(manager, logNames, computationName, intervalSecond, count, verbose, codec, 0);
071        this.host = host;
072        this.port = port;
073        this.udp = udp;
074        this.partition = partition;
075        this.basePrefix = basePrefix;
076    }
077
078    @Override
079    public void init(ComputationContext context) {
080        super.init(context);
081        if (udp) {
082            graphite = new GraphiteUDP(host, port);
083        } else {
084            graphite = new Graphite(host, port);
085        }
086        try {
087            graphite.connect();
088        } catch (IOException e) {
089            throw new IllegalStateException("Fail to connect to " + host + ":" + port, e);
090        }
091    }
092
093    @Override
094    protected void processLatencies(ComputationContext context, LogPartitionGroup logGroup, List<Latency> latencies) {
095        Latency groupLatency = Latency.of(latencies);
096        publishMetrics(groupLatency, String.format("%s%s.%s.all.", basePrefix, logGroup.group, logGroup.name));
097        if (!partition) {
098            return;
099        }
100        for (int part = 0; part < latencies.size(); part++) {
101            publishMetrics(latencies.get(part),
102                    String.format("%s%s.%s.p%02d.", basePrefix, logGroup.group, logGroup.name, part));
103        }
104    }
105
106    protected void publishMetrics(Latency latency, String prefix) {
107        if (verbose) {
108            log.info(latency.toString());
109        }
110        // upper is the time when the latency has been measured
111        long metricTime = latency.upper() / 1000;
112        try {
113            graphite.send(prefix + "lag", Long.toString(latency.lag().lag()), metricTime);
114            graphite.send(prefix + "end", Long.toString(latency.lag().upper()), metricTime);
115            graphite.send(prefix + "pos", Long.toString(latency.lag().lower()), metricTime);
116            graphite.send(prefix + "latency", Long.toString(latency.latency()), metricTime);
117        } catch (IOException e) {
118            log.error("Fail to send metric to graphite " + prefix + " " + latency, e);
119        }
120    }
121
122    @Override
123    public void destroy() {
124        super.destroy();
125        if (graphite != null) {
126            try {
127                graphite.close();
128            } catch (IOException e) {
129                log.debug("Error when closing graphite socket: ", e);
130            }
131            graphite = null;
132        }
133    }
134}