001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.net.InetAddress;
022import java.net.UnknownHostException;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import java.util.stream.Collectors;
027
028import org.apache.commons.cli.CommandLine;
029import org.apache.commons.cli.Option;
030import org.apache.commons.cli.Options;
031import org.nuxeo.lib.stream.computation.Settings;
032import org.nuxeo.lib.stream.computation.StreamManager;
033import org.nuxeo.lib.stream.computation.StreamProcessor;
034import org.nuxeo.lib.stream.computation.Topology;
035import org.nuxeo.lib.stream.computation.log.LogStreamManager;
036import org.nuxeo.lib.stream.log.LogManager;
037import org.nuxeo.lib.stream.log.Name;
038
039/**
040 * Monitors consumer latencies to graphite
041 *
042 * @since 10.3
043 */
044public class MonitorCommand extends Command {
045
046    public static final String COMPUTATION_NAME = "tools/LatencyMonitor";
047
048    public static final String INPUT_STREAM = "log_null";
049
050    public static final String INTERNAL_LOG_PREFIX = "_";
051
052    protected static final String NAME = "monitor";
053
054    protected static final String DEFAULT_INTERVAL = "60";
055
056    protected static final String DEFAULT_COUNT = "-1";
057
058    protected static final String ALL_LOGS = "all";
059
060    protected static final String DEFAULT_PORT = "2003";
061
062    protected boolean verbose = false;
063
064    protected boolean partition = false;
065
066    protected List<Name> logNames;
067
068    protected int interval;
069
070    protected int count;
071
072    protected Topology topology;
073
074    protected StreamProcessor processor;
075
076    protected String codec;
077
078    protected String host;
079
080    protected int port;
081
082    protected boolean udp;
083
084    protected String prefix;
085
086    @Override
087    public String name() {
088        return NAME;
089    }
090
091    @Override
092    public void updateOptions(Options options) {
093        options.addOption(Option.builder("l")
094                                .longOpt("log-name")
095                                .desc("Monitor consumers latency for this LOG, must be a computation Record, "
096                                        + "can be a comma separated list of log names or ALL")
097                                .required()
098                                .hasArg()
099                                .argName("LOG_NAME")
100                                .build());
101        options.addOption(Option.builder("h")
102                                .longOpt("host")
103                                .desc("The carbon server host")
104                                .required()
105                                .hasArg()
106                                .argName("HOST")
107                                .build());
108        options.addOption(Option.builder("p")
109                                .longOpt("port")
110                                .desc("The carbon server port if not 2003")
111                                .hasArg()
112                                .argName("PORT")
113                                .build());
114        options.addOption("u", "udp", false, "Carbon instance is listening using UDP");
115        options.addOption(Option.builder().longOpt("partition").desc("Report metrics for each partition").build());
116        options.addOption(Option.builder("i")
117                                .longOpt("interval")
118                                .desc("send latency spaced at the specified interval in seconds")
119                                .hasArg()
120                                .argName("INTERVAL")
121                                .build());
122        options.addOption(Option.builder("c")
123                                .longOpt("count")
124                                .desc("number of times the latency information is sent")
125                                .hasArg()
126                                .argName("COUNT")
127                                .build());
128        options.addOption(Option.builder()
129                                .longOpt("prefix")
130                                .desc("The metric prefix to use if not server.<hostname>.nuxeo.streams.")
131                                .hasArg()
132                                .argName("PREFIX")
133                                .build());
134        options.addOption(Option.builder()
135                                .longOpt("codec")
136                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
137                                .hasArg()
138                                .argName("CODEC")
139                                .build());
140        options.addOption(Option.builder().longOpt("verbose").build());
141    }
142
143    @Override
144    public boolean run(LogManager manager, CommandLine cmd) {
145        logNames = getLogNames(manager, cmd.getOptionValue("log-name"));
146        codec = cmd.getOptionValue("codec");
147        partition = cmd.hasOption("partition");
148        verbose = cmd.hasOption("verbose");
149        interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL));
150        count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT));
151        port = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_PORT));
152        host = cmd.getOptionValue("host");
153        udp = cmd.hasOption("udp");
154        prefix = cmd.getOptionValue("prefix", getDefaultPrefix());
155        initTopology(manager);
156        return runProcessor(manager);
157    }
158
159    protected List<Name> getLogNames(LogManager manager, String names) {
160        if (ALL_LOGS.equalsIgnoreCase(names)) {
161            return manager.listAllNames()
162                          .stream()
163                          .filter(name -> !name.getUrn().startsWith(INTERNAL_LOG_PREFIX))
164                          .filter(name -> !name.getUrn().startsWith(INPUT_STREAM))
165                          .collect(Collectors.toList());
166        }
167        List<Name> ret = Arrays.asList(names.split(",")).stream().map(Name::ofUrn).collect(Collectors.toList());
168        if (ret.isEmpty()) {
169            throw new IllegalArgumentException("No log name provided or found.");
170        }
171        for (Name name : ret) {
172            if (!manager.exists(name)) {
173                throw new IllegalArgumentException("Unknown log name: " + name);
174            }
175        }
176        return ret;
177    }
178
179    protected void initTopology(LogManager manager) {
180        topology = Topology.builder()
181                           .addComputation(
182                                   () -> new LatencyMonitorComputation(manager, logNames, host, port, udp, prefix,
183                                           COMPUTATION_NAME, interval, count, partition, verbose,
184                                           getRecordCodec(codec)),
185                                   Collections.singletonList("i1:" + INPUT_STREAM))
186                           .build();
187    }
188
189    public String getDefaultPrefix() {
190        String hostname;
191        try {
192            hostname = InetAddress.getLocalHost().getHostName().split("\\.")[0];
193        } catch (UnknownHostException e) {
194            hostname = "unknown";
195        }
196        return "servers." + hostname + ".nuxeo.streams.";
197    }
198
199    protected boolean runProcessor(LogManager manager) {
200        StreamManager streamManager = new LogStreamManager(manager);
201        Settings settings = new Settings(1, 1, getRecordCodec(codec));
202        processor = streamManager.registerAndCreateProcessor("monitor", topology, settings);
203        processor.start();
204        while (!processor.isTerminated()) {
205            try {
206                Thread.sleep(1000);
207            } catch (InterruptedException e) {
208                Thread.currentThread().interrupt();
209                processor.shutdown();
210                return false;
211            }
212        }
213        return true;
214    }
215
216}