001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.IOException;
022import java.net.InetAddress;
023import java.net.UnknownHostException;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.coursera.metrics.datadog.model.DatadogGauge;
030import org.coursera.metrics.datadog.transport.HttpTransport;
031import org.coursera.metrics.datadog.transport.Transport;
032import org.nuxeo.lib.stream.codec.Codec;
033import org.nuxeo.lib.stream.computation.ComputationContext;
034import org.nuxeo.lib.stream.computation.Record;
035import org.nuxeo.lib.stream.log.Latency;
036import org.nuxeo.lib.stream.log.LogManager;
037import org.nuxeo.lib.stream.log.Name;
038import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
039
040/**
041 * A computation that sends periodically latencies to Datadog.
042 *
043 * @since 11.1
044 */
045public class LatencyDatadogComputation extends LatencyTrackerComputation {
046
047    private static final Log log = LogFactory.getLog(LatencyDatadogComputation.class);
048
049    protected static final String HOSTNAME_UNKNOWN = "unknown";
050
051    protected final String apiKey;
052
053    protected final List<String> tags;
054
055    protected final String basePrefix;
056
057    protected final boolean partition;
058
059    protected final String hostname;
060
061    protected HttpTransport transport;
062
063    public LatencyDatadogComputation(LogManager manager, List<Name> logNames, String apiKey, List<String> tags,
064            String basePrefix, String computationName, int intervalSecond, int count, boolean partition,
065            boolean verbose, Codec<Record> codec) {
066        super(manager, logNames, computationName, intervalSecond, count, verbose, codec, 0);
067        this.apiKey = apiKey;
068        this.tags = tags;
069        this.basePrefix = basePrefix;
070        this.partition = partition;
071        hostname = getHostName();
072    }
073
074    protected String getHostName() {
075        try {
076            return InetAddress.getLocalHost().getHostName().split("\\.")[0];
077        } catch (UnknownHostException e) {
078            return HOSTNAME_UNKNOWN;
079        }
080    }
081
082    @Override
083    public void init(ComputationContext context) {
084        super.init(context);
085        transport = new HttpTransport.Builder().withApiKey(apiKey).build();
086    }
087
088    @Override
089    protected void processLatencies(ComputationContext context, LogPartitionGroup logGroup, List<Latency> latencies) {
090        Latency groupLatency = Latency.of(latencies);
091        publishMetrics(groupLatency, basePrefix, "all", logGroup.name, logGroup.group);
092        if (!partition) {
093            return;
094        }
095        for (int part = 0; part < latencies.size(); part++) {
096            publishMetrics(groupLatency, basePrefix, String.format("%02d", part), logGroup.name, logGroup.group);
097        }
098    }
099
100    protected void publishMetrics(Latency latency, String prefix, String partition, Name stream, Name group) {
101        if (verbose) {
102            log.info(latency.toString());
103        }
104        // upper is the time when the latency has been measured
105        long metricTime = latency.upper() / 1000;
106        List<String> mTags = new ArrayList<>(tags.size() + 3);
107        mTags.addAll(tags);
108        mTags.add("stream:" + stream);
109        mTags.add("consumer:" + group);
110        mTags.add("partition:" + partition);
111        try {
112            Transport.Request request = transport.prepare();
113            request.addGauge(new DatadogGauge(prefix + ".lag", latency.lag().lag(), metricTime, hostname, mTags));
114            request.addGauge(new DatadogGauge(prefix + ".end", latency.lag().upper(), metricTime, hostname, mTags));
115            request.addGauge(new DatadogGauge(prefix + ".pos", latency.lag().lower(), metricTime, hostname, mTags));
116            request.addGauge(new DatadogGauge(prefix + ".latency", latency.latency(), metricTime, hostname, mTags));
117            request.send();
118        } catch (IOException e) {
119            log.error("Fail to prepare metric to datadog " + prefix + " " + latency, e);
120        } catch (Exception e) {
121            log.error("Fail to send metric to datadog " + prefix + " " + latency, e);
122        }
123    }
124
125    @Override
126    public void destroy() {
127        super.destroy();
128        if (transport != null) {
129            try {
130                transport.close();
131            } catch (IOException e) {
132                log.debug("Error when closing Datadog client: ", e);
133            }
134            transport = null;
135        }
136    }
137}