001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.util.List;
022import java.util.concurrent.atomic.AtomicInteger;
023
024import org.apache.commons.cli.CommandLine;
025import org.apache.commons.cli.Option;
026import org.apache.commons.cli.Options;
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.lib.stream.log.LogLag;
030import org.nuxeo.lib.stream.log.LogManager;
031
032/**
033 * Display the current lags of consumers.
034 *
035 * @since 9.3
036 */
037public class LagCommand extends Command {
038    private static final Log log = LogFactory.getLog(LagCommand.class);
039
040    protected static final String NAME = "lag";
041
042    protected boolean verbose = false;
043
044    @Override
045    public String name() {
046        return NAME;
047    }
048
049    @Override
050    public void updateOptions(Options options) {
051        options.addOption(
052                Option.builder("l").longOpt("log-name").desc("Log name").hasArg().argName("LOG_NAME").build());
053        options.addOption(Option.builder().longOpt("verbose").desc("Display lag for each partition").build());
054    }
055
056    @Override
057    public boolean run(LogManager manager, CommandLine cmd) {
058        String name = cmd.getOptionValue("log-name");
059        verbose = cmd.hasOption("verbose");
060        if (name != null) {
061            lag(manager, name);
062        } else {
063            lag(manager);
064        }
065        return true;
066    }
067
068    protected void lag(LogManager manager) {
069        log.info("# " + manager);
070        for (String name : manager.listAll()) {
071            lag(manager, name);
072        }
073    }
074
075    protected void lag(LogManager manager, String name) {
076        log.info("## Log: " + name + " partitions: " + manager.size(name));
077        List<String> consumers = manager.listConsumerGroups(name);
078        if (verbose && consumers.isEmpty()) {
079            // add a fake group to get info on end positions
080            consumers.add("tools");
081        }
082        consumers.forEach(group -> renderLag(group, manager.getLagPerPartition(name, group)));
083    }
084
085    protected void renderLag(String group, List<LogLag> lags) {
086        log.info("### Group: " + group);
087        log.info("| partition | lag | pos | end | posOffset | endOffset |\n"
088                + "| --- | ---: | ---: | ---: | ---: | ---: |");
089        LogLag all = LogLag.of(lags);
090        log.info(String.format("|All|%d|%d|%d|%d|%d|", all.lag(), all.lower(), all.upper(), all.lowerOffset(),
091                all.upperOffset()));
092        if (verbose && lags.size() > 1) {
093            AtomicInteger i = new AtomicInteger();
094            lags.forEach(lag -> log.info(String.format("|%s|%d|%d|%d|%d|%d|", i.getAndIncrement(), lag.lag(),
095                    lag.lower(), lag.upper(), lag.lowerOffset(), lag.upperOffset())));
096        }
097    }
098
099}