001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.util.Arrays;
022import java.util.List;
023import java.util.stream.Collectors;
024
025import org.apache.commons.cli.CommandLine;
026import org.apache.commons.cli.Option;
027import org.apache.commons.cli.Options;
028import org.nuxeo.lib.stream.computation.Settings;
029import org.nuxeo.lib.stream.computation.Topology;
030import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
031import org.nuxeo.lib.stream.log.LogManager;
032
033/**
034 * Track consumer positions so they can be restored in case of fail-over
035 *
036 * @since 10.1
037 */
038public class TrackerCommand extends Command {
039
040    public static final String COMPUTATION_NAME = "LatencyTracker";
041
042    public static final String INPUT_STREAM = "log_null";
043
044    public static final String INTERNAL_LOG_PREFIX = "_";
045
046    protected static final String NAME = "tracker";
047
048    protected static final String DEFAULT_INTERVAL = "60";
049
050    protected static final String DEFAULT_COUNT = "-1";
051
052    protected static final String ALL_LOGS = "all";
053
054    protected static final String DEFAULT_LATENCIES_LOG = "_consumer_latencies";
055
056    protected boolean verbose = false;
057
058    protected String output;
059
060    protected List<String> logNames;
061
062    protected int interval;
063
064    protected int count;
065
066    protected Topology topology;
067
068    protected LogStreamProcessor processor;
069
070    protected String codec;
071
072    @Override
073    public String name() {
074        return NAME;
075    }
076
077    @Override
078    public void updateOptions(Options options) {
079        options.addOption(Option.builder("l")
080                                .longOpt("log-name")
081                                .desc("Track consumers latency for this LOG, must be a computation Record, "
082                                        + "can be a comma separated list of log names or ALL")
083                                .required()
084                                .hasArg()
085                                .argName("LOG_NAME")
086                                .build());
087        options.addOption(Option.builder("o")
088                                .longOpt("log-output")
089                                .desc("Log name of the output")
090                                .hasArg()
091                                .argName("LOG_OUTPUT")
092                                .build());
093        options.addOption(Option.builder("i")
094                                .longOpt("interval")
095                                .desc("send latency spaced at the specified interval in seconds")
096                                .hasArg()
097                                .argName("INTERVAL")
098                                .build());
099        options.addOption(Option.builder("c")
100                                .longOpt("count")
101                                .desc("number of time to send the latency information")
102                                .hasArg()
103                                .argName("COUNT")
104                                .build());
105        options.addOption(Option.builder()
106                                .longOpt("codec")
107                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
108                                .hasArg()
109                                .argName("CODEC")
110                                .build());
111        options.addOption(Option.builder().longOpt("verbose").build());
112    }
113
114    @Override
115    public boolean run(LogManager manager, CommandLine cmd) {
116        logNames = getLogNames(manager, cmd.getOptionValue("log-name"));
117        output = cmd.getOptionValue("log-output", DEFAULT_LATENCIES_LOG);
118        codec = cmd.getOptionValue("codec");
119        verbose = cmd.hasOption("verbose");
120        interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL));
121        count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT));
122
123        initTopology(manager);
124        return runProcessor(manager);
125    }
126
127    protected List<String> getLogNames(LogManager manager, String names) {
128        if (ALL_LOGS.equalsIgnoreCase(names)) {
129            return manager.listAll()
130                          .stream()
131                          .filter(name -> !name.startsWith(INTERNAL_LOG_PREFIX))
132                          .collect(Collectors.toList());
133        }
134        List<String> ret = Arrays.asList(names.split(","));
135        for (String name : ret) {
136            if (!manager.exists(name)) {
137                throw new IllegalArgumentException("Unknown log name: " + name);
138            }
139        }
140        return ret;
141    }
142
143    protected void initTopology(LogManager manager) {
144        topology = Topology.builder()
145                           .addComputation(
146                                   () -> new LatencyTrackerComputation(manager, logNames, COMPUTATION_NAME, interval,
147                                           count, verbose, getRecordCodec(codec)),
148                                   Arrays.asList("i1:" + INPUT_STREAM, "o1:" + output))
149                           .build();
150    }
151
152    protected boolean runProcessor(LogManager manager) {
153        processor = new LogStreamProcessor(manager);
154        Settings settings = new Settings(1, 1, getRecordCodec(codec));
155        processor.init(topology, settings).start();
156        while (!processor.isTerminated()) {
157            try {
158                Thread.sleep(1000);
159            } catch (InterruptedException e) {
160                Thread.currentThread().interrupt();
161                processor.shutdown();
162                return false;
163            }
164        }
165        return true;
166    }
167
168}