001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.net.InetAddress; 022import java.net.UnknownHostException; 023import java.util.Arrays; 024import java.util.List; 025import java.util.stream.Collectors; 026 027import org.apache.commons.cli.CommandLine; 028import org.apache.commons.cli.Option; 029import org.apache.commons.cli.Options; 030import org.nuxeo.lib.stream.computation.Settings; 031import org.nuxeo.lib.stream.computation.Topology; 032import org.nuxeo.lib.stream.computation.log.LogStreamProcessor; 033import org.nuxeo.lib.stream.log.LogManager; 034 035/** 036 * Monitor consumer latencies to graphite 037 * 038 * @since 10.3 039 */ 040public class MonitorCommand extends Command { 041 042 public static final String COMPUTATION_NAME = "LatencyMonitor"; 043 044 public static final String INPUT_STREAM = "log_null"; 045 046 public static final String INTERNAL_LOG_PREFIX = "_"; 047 048 protected static final String NAME = "monitor"; 049 050 protected static final String DEFAULT_INTERVAL = "60"; 051 052 protected static final String DEFAULT_COUNT = "-1"; 053 054 protected static final String ALL_LOGS = "all"; 055 056 protected static final String DEFAULT_PORT = "2003"; 057 058 protected boolean verbose = false; 059 060 protected String output; 061 062 protected List<String> logNames; 063 064 protected int interval; 065 066 protected int count; 067 068 protected Topology topology; 069 070 protected LogStreamProcessor processor; 071 072 protected String codec; 073 074 protected String host; 075 076 protected int port; 077 078 protected boolean udp; 079 080 protected String prefix; 081 082 @Override 083 public String name() { 084 return NAME; 085 } 086 087 @Override 088 public void updateOptions(Options options) { 089 options.addOption(Option.builder("l") 090 .longOpt("log-name") 091 .desc("Monitor consumers latency for this LOG, must be a computation Record, " 092 + "can be a comma separated list of log names or ALL") 093 .required() 094 .hasArg() 095 .argName("LOG_NAME") 096 .build()); 097 options.addOption(Option.builder("h") 098 .longOpt("host") 099 .desc("The carbon server host") 100 .required() 101 .hasArg() 102 .argName("HOST") 103 .build()); 104 options.addOption(Option.builder("p") 105 .longOpt("port") 106 .desc("The carbon server port if not 2003") 107 .hasArg() 108 .argName("PORT") 109 .build()); 110 options.addOption("u", "udp", false, "Carbon instance is listening using UDP"); 111 options.addOption(Option.builder("i") 112 .longOpt("interval") 113 .desc("send latency spaced at the specified interval in seconds") 114 .hasArg() 115 .argName("INTERVAL") 116 .build()); 117 options.addOption(Option.builder("c") 118 .longOpt("count") 119 .desc("number of time to send the latency information") 120 .hasArg() 121 .argName("COUNT") 122 .build()); 123 options.addOption(Option.builder() 124 .longOpt("prefix") 125 .desc("The metric prefix to use if not server.<hostname>.nuxeo.streams.") 126 .hasArg() 127 .argName("PREFIX") 128 .build()); 129 options.addOption(Option.builder() 130 .longOpt("codec") 131 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 132 .hasArg() 133 .argName("CODEC") 134 .build()); 135 options.addOption(Option.builder().longOpt("verbose").build()); 136 } 137 138 @Override 139 public boolean run(LogManager manager, CommandLine cmd) { 140 logNames = getLogNames(manager, cmd.getOptionValue("log-name")); 141 codec = cmd.getOptionValue("codec"); 142 verbose = cmd.hasOption("verbose"); 143 interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL)); 144 count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT)); 145 port = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_PORT)); 146 host = cmd.getOptionValue("host"); 147 udp = cmd.hasOption("udp"); 148 prefix = cmd.getOptionValue("prefix", getDefaultPrefix()); 149 initTopology(manager); 150 return runProcessor(manager); 151 } 152 153 protected List<String> getLogNames(LogManager manager, String names) { 154 if (ALL_LOGS.equalsIgnoreCase(names)) { 155 return manager.listAll() 156 .stream() 157 .filter(name -> !name.startsWith(INTERNAL_LOG_PREFIX)) 158 .collect(Collectors.toList()); 159 } 160 List<String> ret = Arrays.asList(names.split(",")); 161 for (String name : ret) { 162 if (!manager.exists(name)) { 163 throw new IllegalArgumentException("Unknown log name: " + name); 164 } 165 } 166 return ret; 167 } 168 169 protected void initTopology(LogManager manager) { 170 topology = Topology.builder() 171 .addComputation( 172 () -> new LatencyMonitorComputation(manager, logNames, host, port, udp, prefix, 173 COMPUTATION_NAME, interval, count, verbose, getRecordCodec(codec)), 174 Arrays.asList("i1:" + INPUT_STREAM, "o1:" + output)) 175 .build(); 176 } 177 178 public String getDefaultPrefix() { 179 String hostname; 180 try { 181 hostname = InetAddress.getLocalHost().getHostName().split("\\.")[0]; 182 } catch (UnknownHostException e) { 183 hostname = "unknown"; 184 } 185 return "servers." + hostname + ".nuxeo.streams."; 186 } 187 188 protected boolean runProcessor(LogManager manager) { 189 processor = new LogStreamProcessor(manager); 190 Settings settings = new Settings(1, 1, getRecordCodec(codec)); 191 processor.init(topology, settings).start(); 192 while (!processor.isTerminated()) { 193 try { 194 Thread.sleep(1000); 195 } catch (InterruptedException e) { 196 Thread.currentThread().interrupt(); 197 processor.shutdown(); 198 return false; 199 } 200 } 201 return true; 202 } 203 204}