001/* 002 * (C) Copyright 2020 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.util.Arrays; 022import java.util.Collections; 023import java.util.List; 024import java.util.stream.Collectors; 025 026import org.apache.commons.cli.CommandLine; 027import org.apache.commons.cli.Option; 028import org.apache.commons.cli.Options; 029import org.nuxeo.lib.stream.computation.Topology; 030import org.nuxeo.lib.stream.log.LogManager; 031 032/** 033 * Monitors consumer latencies to Datadog. 034 * 035 * @since 11.1 036 */ 037public class DatadogCommand extends MonitorCommand { 038 039 public static final String COMPUTATION_NAME = "tools/LatencyMonitorDatadog"; 040 041 protected static final String NAME = "datadog"; 042 043 private static final String DEFAULT_PREFIX = "nuxeo.streams"; 044 045 protected String apiKey; 046 047 protected List<String> tags; 048 049 @Override 050 public String name() { 051 return NAME; 052 } 053 054 @Override 055 public void updateOptions(Options options) { 056 options.addOption(Option.builder("l") 057 .longOpt("log-name") 058 .desc("Monitor consumers latency for this LOG, must be a computation Record, " 059 + "can be a comma separated list of log names or ALL") 060 .required() 061 .hasArg() 062 .argName("LOG_NAME") 063 .build()); 064 options.addOption(Option.builder() 065 .longOpt("api-key") 066 .desc("Datadog API KEY") 067 .required() 068 .hasArg() 069 .argName("API_KEY") 070 .build()); 071 options.addOption(Option.builder() 072 .longOpt("tags") 073 .desc("A comma separated list of Datadog tags, for instance: project:foo,staging:bar") 074 .hasArg() 075 .argName("TAGS") 076 .build()); 077 options.addOption(Option.builder().longOpt("partition").desc("Report metrics for each partition").build()); 078 options.addOption(Option.builder("i") 079 .longOpt("interval") 080 .desc("send latency spaced at the specified interval in seconds") 081 .hasArg() 082 .argName("INTERVAL") 083 .build()); 084 options.addOption(Option.builder("c") 085 .longOpt("count") 086 .desc("number of times the latency information is sent") 087 .hasArg() 088 .argName("COUNT") 089 .build()); 090 options.addOption(Option.builder() 091 .longOpt("prefix") 092 .desc("The metric prefix to use if not nuxeo.streams.") 093 .hasArg() 094 .argName("PREFIX") 095 .build()); 096 options.addOption(Option.builder() 097 .longOpt("codec") 098 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 099 .hasArg() 100 .argName("CODEC") 101 .build()); 102 options.addOption(Option.builder().longOpt("verbose").build()); 103 } 104 105 @Override 106 public boolean run(LogManager manager, CommandLine cmd) { 107 logNames = getLogNames(manager, cmd.getOptionValue("log-name")); 108 codec = cmd.getOptionValue("codec"); 109 verbose = cmd.hasOption("verbose"); 110 interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL)); 111 count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT)); 112 apiKey = cmd.getOptionValue("api-key"); 113 tags = getTags(cmd.getOptionValue("tags")); 114 prefix = cmd.getOptionValue("prefix", getDefaultPrefix()); 115 initTopology(manager); 116 return runProcessor(manager); 117 } 118 119 protected List<String> getTags(String tags) { 120 if (tags == null) { 121 return Collections.emptyList(); 122 } 123 return Arrays.stream(tags.split(",")).map(String::trim).collect(Collectors.toList()); 124 } 125 126 @Override 127 protected void initTopology(LogManager manager) { 128 topology = Topology.builder() 129 .addComputation(() -> new LatencyDatadogComputation(manager, logNames, apiKey, tags, prefix, 130 COMPUTATION_NAME, interval, count, partition, verbose, getRecordCodec(codec)), 131 Collections.singletonList("i1:" + INPUT_STREAM)) 132 .build(); 133 } 134 135 @Override 136 public String getDefaultPrefix() { 137 return DEFAULT_PREFIX; 138 } 139 140}