001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.util.Arrays;
022import java.util.List;
023import java.util.stream.Collectors;
024
025import org.apache.commons.cli.CommandLine;
026import org.apache.commons.cli.Option;
027import org.apache.commons.cli.Options;
028import org.nuxeo.lib.stream.computation.Settings;
029import org.nuxeo.lib.stream.computation.Topology;
030import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
031import org.nuxeo.lib.stream.log.LogManager;
032
033/**
034 * Track consumer positions so they can be restored in case of fail-over
035 *
036 * @since 10.1
037 */
038public class TrackerCommand extends Command {
039
040    public static final String COMPUTATION_NAME = "LatencyTracker";
041
042    public static final String INPUT_STREAM = "log_null";
043
044    public static final String INTERNAL_LOG_PREFIX = "_";
045
046    protected static final String NAME = "tracker";
047
048    protected static final String DEFAULT_INTERVAL = "60";
049
050    protected static final String DEFAULT_COUNT = "-1";
051
052    protected static final String ALL_LOGS = "all";
053
054    protected static final String DEFAULT_LATENCIES_LOG = "_consumer_latencies";
055
056    protected boolean verbose = false;
057
058    protected String output;
059
060    protected List<String> logNames;
061
062    protected int interval;
063
064    protected int count;
065
066    protected Topology topology;
067
068    protected LogStreamProcessor processor;
069
070    protected String codec;
071
072    @Override
073    public String name() {
074        return NAME;
075    }
076
077    @Override
078    public void updateOptions(Options options) {
079        options.addOption(Option.builder("l")
080                                .longOpt("log-name")
081                                .desc("Track consumers latency for this LOG, must be a computation Record, "
082                                        + "can be a comma separated list of log names or ALL")
083                                .required()
084                                .hasArg()
085                                .argName("LOG_NAME")
086                                .build());
087        options.addOption(Option.builder("o")
088                                .longOpt("log-output")
089                                .desc("Log name of the output")
090                                .hasArg()
091                                .argName("LOG_OUTPUT")
092                                .build());
093        options.addOption(Option.builder("i")
094                                .longOpt("interval")
095                                .desc("send latency spaced at the specified interval in seconds")
096                                .hasArg()
097                                .argName("INTERVAL")
098                                .build());
099        options.addOption(Option.builder("c")
100                                .longOpt("count")
101                                .desc("number of time to send the latency information")
102                                .hasArg()
103                                .argName("COUNT")
104                                .build());
105        options.addOption(Option.builder()
106                                .longOpt("codec")
107                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
108                                .hasArg()
109                                .argName("CODEC")
110                                .build());
111        options.addOption(Option.builder().longOpt("verbose").build());
112    }
113
114    @Override
115    public boolean run(LogManager manager, CommandLine cmd) {
116        logNames = getLogNames(manager, cmd.getOptionValue("log-name"));
117        output = cmd.getOptionValue("log-output", DEFAULT_LATENCIES_LOG);
118        codec = cmd.getOptionValue("codec");
119        verbose = cmd.hasOption("verbose");
120        interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL));
121        count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT));
122
123        initTopology(manager);
124        return runProcessor(manager);
125    }
126
127    protected List<String> getLogNames(LogManager manager, String names) {
128        if (ALL_LOGS.equalsIgnoreCase(names)) {
129            return manager.listAll().stream().filter(name -> !name.startsWith(INTERNAL_LOG_PREFIX)).collect(
130                    Collectors.toList());
131        }
132        List<String> ret = Arrays.asList(names.split(","));
133        for (String name : ret) {
134            if (!manager.exists(name)) {
135                throw new IllegalArgumentException("Unknown log name: " + name);
136            }
137        }
138        return ret;
139    }
140
141    protected void initTopology(LogManager manager) {
142        topology = Topology.builder()
143                           .addComputation(
144                                   () -> new LatencyTrackerComputation(manager, logNames, COMPUTATION_NAME, interval,
145                                           count, verbose, getRecordCodec(codec)),
146                                   Arrays.asList("i1:" + INPUT_STREAM, "o1:" + output))
147                           .build();
148    }
149
150    protected boolean runProcessor(LogManager manager) {
151        processor = new LogStreamProcessor(manager);
152        Settings settings = new Settings(1, 1, getRecordCodec(codec));
153        processor.init(topology, settings).start();
154        while (!processor.isTerminated()) {
155            try {
156                Thread.sleep(1000);
157            } catch (InterruptedException e) {
158                Thread.currentThread().interrupt();
159                processor.shutdown();
160                return false;
161            }
162        }
163        return true;
164    }
165
166}