001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.util.Arrays;
022import java.util.List;
023import java.util.stream.Collectors;
024
025import org.apache.commons.cli.CommandLine;
026import org.apache.commons.cli.Option;
027import org.apache.commons.cli.Options;
028import org.nuxeo.lib.stream.computation.Settings;
029import org.nuxeo.lib.stream.computation.StreamManager;
030import org.nuxeo.lib.stream.computation.StreamProcessor;
031import org.nuxeo.lib.stream.computation.Topology;
032import org.nuxeo.lib.stream.computation.log.LogStreamManager;
033import org.nuxeo.lib.stream.log.LogManager;
034import org.nuxeo.lib.stream.log.Name;
035
036/**
037 * Track consumer positions so they can be restored in case of fail-over
038 *
039 * @since 10.1
040 */
041public class TrackerCommand extends Command {
042
043    public static final String COMPUTATION_NAME = "tools/LatencyTracker";
044
045    public static final String INPUT_STREAM = "input/null";
046
047    public static final String INTERNAL_LOG_PREFIX = "_";
048
049    protected static final String NAME = "tracker";
050
051    protected static final String DEFAULT_INTERVAL = "60";
052
053    protected static final String DEFAULT_COUNT = "-1";
054
055    protected static final String ALL_LOGS = "all";
056
057    protected static final String DEFAULT_LATENCIES_LOG = "_consumer_latencies";
058
059    protected boolean verbose = false;
060
061    protected String output;
062
063    protected List<Name> logNames;
064
065    protected int interval;
066
067    protected int count;
068
069    protected Topology topology;
070
071    protected StreamProcessor processor;
072
073    protected String codec;
074
075    @Override
076    public String name() {
077        return NAME;
078    }
079
080    @Override
081    public void updateOptions(Options options) {
082        options.addOption(Option.builder("l")
083                                .longOpt("log-name")
084                                .desc("Track consumers latency for this LOG, must be a computation Record, "
085                                        + "can be a comma separated list of log names or ALL")
086                                .required()
087                                .hasArg()
088                                .argName("LOG_NAME")
089                                .build());
090        options.addOption(Option.builder("o")
091                                .longOpt("log-output")
092                                .desc("Log name of the output")
093                                .hasArg()
094                                .argName("LOG_OUTPUT")
095                                .build());
096        options.addOption(Option.builder("i")
097                                .longOpt("interval")
098                                .desc("send latency spaced at the specified interval in seconds")
099                                .hasArg()
100                                .argName("INTERVAL")
101                                .build());
102        options.addOption(Option.builder("c")
103                                .longOpt("count")
104                                .desc("number of time to send the latency information")
105                                .hasArg()
106                                .argName("COUNT")
107                                .build());
108        options.addOption(Option.builder()
109                                .longOpt("codec")
110                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
111                                .hasArg()
112                                .argName("CODEC")
113                                .build());
114        options.addOption(Option.builder().longOpt("verbose").build());
115    }
116
117    @Override
118    public boolean run(LogManager manager, CommandLine cmd) {
119        logNames = getLogNames(manager, cmd.getOptionValue("log-name"));
120        output = cmd.getOptionValue("log-output", DEFAULT_LATENCIES_LOG);
121        codec = cmd.getOptionValue("codec");
122        verbose = cmd.hasOption("verbose");
123        interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL));
124        count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT));
125
126        initTopology(manager);
127        return runProcessor(manager);
128    }
129
130    protected List<Name> getLogNames(LogManager manager, String names) {
131        if (ALL_LOGS.equalsIgnoreCase(names)) {
132            return manager.listAllNames().stream().filter(name -> !INPUT_STREAM.equals(name.getName()))
133                          .collect(Collectors.toList());
134        }
135        List<Name> ret = Arrays.stream(names.split(",")).map(Name::ofUrn).collect(Collectors.toList());
136        if (ret.isEmpty()) {
137            throw new IllegalArgumentException("No log name provided or found.");
138        }
139        for (Name name : ret) {
140            if (!manager.exists(name)) {
141                throw new IllegalArgumentException("Unknown log name: " + name);
142            }
143        }
144        return ret;
145    }
146
147    protected void initTopology(LogManager manager) {
148        topology = Topology.builder()
149                           .addComputation(
150                                   () -> new LatencyTrackerComputation(manager, logNames, COMPUTATION_NAME, interval,
151                                           count, verbose, getRecordCodec(codec), 1),
152                                   Arrays.asList("i1:" + INPUT_STREAM, "o1:" + output))
153                           .build();
154    }
155
156    protected boolean runProcessor(LogManager manager) {
157        StreamManager streamManager = new LogStreamManager(manager);
158        Settings settings = new Settings(1, 1, getRecordCodec(codec));
159        processor = streamManager.registerAndCreateProcessor("tracker", topology, settings);
160        processor.start();
161        while (!processor.isTerminated()) {
162            try {
163                Thread.sleep(1000);
164            } catch (InterruptedException e) {
165                Thread.currentThread().interrupt();
166                processor.shutdown();
167                return false;
168            }
169        }
170        return true;
171    }
172
173}