001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.net.InetAddress; 022import java.net.UnknownHostException; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import java.util.stream.Collectors; 027 028import org.apache.commons.cli.CommandLine; 029import org.apache.commons.cli.Option; 030import org.apache.commons.cli.Options; 031import org.nuxeo.lib.stream.computation.Settings; 032import org.nuxeo.lib.stream.computation.StreamManager; 033import org.nuxeo.lib.stream.computation.StreamProcessor; 034import org.nuxeo.lib.stream.computation.Topology; 035import org.nuxeo.lib.stream.computation.log.LogStreamManager; 036import org.nuxeo.lib.stream.log.LogManager; 037import org.nuxeo.lib.stream.log.Name; 038 039/** 040 * Monitors consumer latencies to graphite 041 * 042 * @since 10.3 043 */ 044public class MonitorCommand extends Command { 045 046 public static final String COMPUTATION_NAME = "tools/LatencyMonitor"; 047 048 public static final String INPUT_STREAM = "log_null"; 049 050 public static final String INTERNAL_LOG_PREFIX = "_"; 051 052 protected static final String NAME = "monitor"; 053 054 protected static final String DEFAULT_INTERVAL = "60"; 055 056 protected static final String DEFAULT_COUNT = "-1"; 057 058 protected static final String ALL_LOGS = "all"; 059 060 protected static final String DEFAULT_PORT = "2003"; 061 062 protected boolean verbose = false; 063 064 protected boolean partition = false; 065 066 protected List<Name> logNames; 067 068 protected int interval; 069 070 protected int count; 071 072 protected Topology topology; 073 074 protected StreamProcessor processor; 075 076 protected String codec; 077 078 protected String host; 079 080 protected int port; 081 082 protected boolean udp; 083 084 protected String prefix; 085 086 @Override 087 public String name() { 088 return NAME; 089 } 090 091 @Override 092 public void updateOptions(Options options) { 093 options.addOption(Option.builder("l") 094 .longOpt("log-name") 095 .desc("Monitor consumers latency for this LOG, must be a computation Record, " 096 + "can be a comma separated list of log names or ALL") 097 .required() 098 .hasArg() 099 .argName("LOG_NAME") 100 .build()); 101 options.addOption(Option.builder("h") 102 .longOpt("host") 103 .desc("The carbon server host") 104 .required() 105 .hasArg() 106 .argName("HOST") 107 .build()); 108 options.addOption(Option.builder("p") 109 .longOpt("port") 110 .desc("The carbon server port if not 2003") 111 .hasArg() 112 .argName("PORT") 113 .build()); 114 options.addOption("u", "udp", false, "Carbon instance is listening using UDP"); 115 options.addOption(Option.builder().longOpt("partition").desc("Report metrics for each partition").build()); 116 options.addOption(Option.builder("i") 117 .longOpt("interval") 118 .desc("send latency spaced at the specified interval in seconds") 119 .hasArg() 120 .argName("INTERVAL") 121 .build()); 122 options.addOption(Option.builder("c") 123 .longOpt("count") 124 .desc("number of times the latency information is sent") 125 .hasArg() 126 .argName("COUNT") 127 .build()); 128 options.addOption(Option.builder() 129 .longOpt("prefix") 130 .desc("The metric prefix to use if not server.<hostname>.nuxeo.streams.") 131 .hasArg() 132 .argName("PREFIX") 133 .build()); 134 options.addOption(Option.builder() 135 .longOpt("codec") 136 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 137 .hasArg() 138 .argName("CODEC") 139 .build()); 140 options.addOption(Option.builder().longOpt("verbose").build()); 141 } 142 143 @Override 144 public boolean run(LogManager manager, CommandLine cmd) { 145 logNames = getLogNames(manager, cmd.getOptionValue("log-name")); 146 codec = cmd.getOptionValue("codec"); 147 partition = cmd.hasOption("partition"); 148 verbose = cmd.hasOption("verbose"); 149 interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL)); 150 count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT)); 151 port = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_PORT)); 152 host = cmd.getOptionValue("host"); 153 udp = cmd.hasOption("udp"); 154 prefix = cmd.getOptionValue("prefix", getDefaultPrefix()); 155 initTopology(manager); 156 return runProcessor(manager); 157 } 158 159 protected List<Name> getLogNames(LogManager manager, String names) { 160 if (ALL_LOGS.equalsIgnoreCase(names)) { 161 return manager.listAllNames() 162 .stream() 163 .filter(name -> !name.getUrn().startsWith(INTERNAL_LOG_PREFIX)) 164 .filter(name -> !name.getUrn().startsWith(INPUT_STREAM)) 165 .collect(Collectors.toList()); 166 } 167 List<Name> ret = Arrays.asList(names.split(",")).stream().map(Name::ofUrn).collect(Collectors.toList()); 168 if (ret.isEmpty()) { 169 throw new IllegalArgumentException("No log name provided or found."); 170 } 171 for (Name name : ret) { 172 if (!manager.exists(name)) { 173 throw new IllegalArgumentException("Unknown log name: " + name); 174 } 175 } 176 return ret; 177 } 178 179 protected void initTopology(LogManager manager) { 180 topology = Topology.builder() 181 .addComputation( 182 () -> new LatencyMonitorComputation(manager, logNames, host, port, udp, prefix, 183 COMPUTATION_NAME, interval, count, partition, verbose, 184 getRecordCodec(codec)), 185 Collections.singletonList("i1:" + INPUT_STREAM)) 186 .build(); 187 } 188 189 public String getDefaultPrefix() { 190 String hostname; 191 try { 192 hostname = InetAddress.getLocalHost().getHostName().split("\\.")[0]; 193 } catch (UnknownHostException e) { 194 hostname = "unknown"; 195 } 196 return "servers." + hostname + ".nuxeo.streams."; 197 } 198 199 protected boolean runProcessor(LogManager manager) { 200 StreamManager streamManager = new LogStreamManager(manager); 201 Settings settings = new Settings(1, 1, getRecordCodec(codec)); 202 processor = streamManager.registerAndCreateProcessor("monitor", topology, settings); 203 processor.start(); 204 while (!processor.isTerminated()) { 205 try { 206 Thread.sleep(1000); 207 } catch (InterruptedException e) { 208 Thread.currentThread().interrupt(); 209 processor.shutdown(); 210 return false; 211 } 212 } 213 return true; 214 } 215 216}