001/*
002 * (C) Copyright 2020 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.List;
024import java.util.stream.Collectors;
025
026import org.apache.commons.cli.CommandLine;
027import org.apache.commons.cli.Option;
028import org.apache.commons.cli.Options;
029import org.nuxeo.lib.stream.computation.Topology;
030import org.nuxeo.lib.stream.log.LogManager;
031
032/**
033 * Monitors consumer latencies to Datadog.
034 *
035 * @since 11.1
036 */
037public class DatadogCommand extends MonitorCommand {
038
039    public static final String COMPUTATION_NAME = "tools/LatencyMonitorDatadog";
040
041    protected static final String NAME = "datadog";
042
043    private static final String DEFAULT_PREFIX = "nuxeo.streams";
044
045    protected String apiKey;
046
047    protected List<String> tags;
048
049    @Override
050    public String name() {
051        return NAME;
052    }
053
054    @Override
055    public void updateOptions(Options options) {
056        options.addOption(Option.builder("l")
057                                .longOpt("log-name")
058                                .desc("Monitor consumers latency for this LOG, must be a computation Record, "
059                                        + "can be a comma separated list of log names or ALL")
060                                .required()
061                                .hasArg()
062                                .argName("LOG_NAME")
063                                .build());
064        options.addOption(Option.builder()
065                                .longOpt("api-key")
066                                .desc("Datadog API KEY")
067                                .required()
068                                .hasArg()
069                                .argName("API_KEY")
070                                .build());
071        options.addOption(Option.builder()
072                                .longOpt("tags")
073                                .desc("A comma separated list of Datadog tags, for instance: project:foo,staging:bar")
074                                .hasArg()
075                                .argName("TAGS")
076                                .build());
077        options.addOption(Option.builder().longOpt("partition").desc("Report metrics for each partition").build());
078        options.addOption(Option.builder("i")
079                                .longOpt("interval")
080                                .desc("send latency spaced at the specified interval in seconds")
081                                .hasArg()
082                                .argName("INTERVAL")
083                                .build());
084        options.addOption(Option.builder("c")
085                                .longOpt("count")
086                                .desc("number of times the latency information is sent")
087                                .hasArg()
088                                .argName("COUNT")
089                                .build());
090        options.addOption(Option.builder()
091                                .longOpt("prefix")
092                                .desc("The metric prefix to use if not nuxeo.streams.")
093                                .hasArg()
094                                .argName("PREFIX")
095                                .build());
096        options.addOption(Option.builder()
097                                .longOpt("codec")
098                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
099                                .hasArg()
100                                .argName("CODEC")
101                                .build());
102        options.addOption(Option.builder().longOpt("verbose").build());
103    }
104
105    @Override
106    public boolean run(LogManager manager, CommandLine cmd) {
107        logNames = getLogNames(manager, cmd.getOptionValue("log-name"));
108        codec = cmd.getOptionValue("codec");
109        verbose = cmd.hasOption("verbose");
110        interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL));
111        count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT));
112        apiKey = cmd.getOptionValue("api-key");
113        tags = getTags(cmd.getOptionValue("tags"));
114        prefix = cmd.getOptionValue("prefix", getDefaultPrefix());
115        initTopology(manager);
116        return runProcessor(manager);
117    }
118
119    protected List<String> getTags(String tags) {
120        if (tags == null) {
121            return Collections.emptyList();
122        }
123        return Arrays.stream(tags.split(",")).map(String::trim).collect(Collectors.toList());
124    }
125
126    @Override
127    protected void initTopology(LogManager manager) {
128        topology = Topology.builder()
129                           .addComputation(() -> new LatencyDatadogComputation(manager, logNames, apiKey, tags, prefix,
130                                   COMPUTATION_NAME, interval, count, partition, verbose, getRecordCodec(codec)),
131                                   Collections.singletonList("i1:" + INPUT_STREAM))
132                           .build();
133    }
134
135    @Override
136    public String getDefaultPrefix() {
137        return DEFAULT_PREFIX;
138    }
139
140}