001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.util.List;
022import java.util.concurrent.atomic.AtomicInteger;
023
024import org.apache.commons.cli.CommandLine;
025import org.apache.commons.cli.Option;
026import org.apache.commons.cli.Options;
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.lib.stream.log.LogLag;
030import org.nuxeo.lib.stream.log.LogManager;
031import org.nuxeo.lib.stream.log.Name;
032
033/**
034 * Display the current lags of consumers.
035 *
036 * @since 9.3
037 */
038public class LagCommand extends Command {
039    private static final Log log = LogFactory.getLog(LagCommand.class);
040
041    protected static final String NAME = "lag";
042
043    protected boolean verbose = false;
044
045    protected boolean quiet = false;
046
047    @Override
048    public String name() {
049        return NAME;
050    }
051
052    @Override
053    public void updateOptions(Options options) {
054        options.addOption(
055                Option.builder("l").longOpt("log-name").desc("Log name").hasArg().argName("LOG_NAME").build());
056        options.addOption(Option.builder().longOpt("verbose").desc("Display lag for each partition").build());
057        options.addOption(Option.builder("q").longOpt("quiet").desc("No output for consumer group without lag").build());
058    }
059
060    @Override
061    public boolean run(LogManager manager, CommandLine cmd) {
062        String logName = cmd.getOptionValue("log-name");
063        verbose = cmd.hasOption("verbose");
064        quiet = cmd.hasOption("quiet");
065        if (logName != null) {
066            lag(manager, Name.ofUrn(logName));
067        } else {
068            lag(manager);
069        }
070        return true;
071    }
072
073    protected void lag(LogManager manager) {
074        log.info("# " + manager);
075        for (Name name : manager.listAllNames()) {
076            lag(manager, name);
077        }
078    }
079
080    protected void lag(LogManager manager, Name name) {
081        log.info("## Log: " + name + " partitions: " + manager.size(name));
082        List<Name> consumers = manager.listConsumerGroups(name);
083        if (verbose && consumers.isEmpty()) {
084            // add a fake group to get info on end positions
085            consumers.add(Name.ofUrn("admin/tools"));
086        }
087        consumers.forEach(group -> renderLag(group, manager.getLagPerPartition(name, group)));
088    }
089
090    protected void renderLag(Name group, List<LogLag> lags) {
091        LogLag all = LogLag.of(lags);
092        if (quiet && all.lag() == 0) {
093            log.info("### Group: " + group + " no lag end: " + all.upper());
094            return;
095        }
096        log.info("### Group: " + group);
097        log.info("| partition | lag | pos | end | posOffset | endOffset |\n"
098                + "| --- | ---: | ---: | ---: | ---: | ---: |");
099        log.info(String.format("|All|%d|%d|%d|%d|%d|", all.lag(), all.lower(), all.upper(), all.lowerOffset(),
100                all.upperOffset()));
101        if (verbose && lags.size() > 1) {
102            AtomicInteger i = new AtomicInteger();
103            lags.forEach(lag -> log.info(String.format("|%s|%d|%d|%d|%d|%d|", i.getAndIncrement(), lag.lag(),
104                    lag.lower(), lag.upper(), lag.lowerOffset(), lag.upperOffset())));
105        }
106    }
107
108}