001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.util.List;
022import java.util.concurrent.atomic.AtomicInteger;
023
024import org.apache.commons.cli.CommandLine;
025import org.apache.commons.cli.Option;
026import org.apache.commons.cli.Options;
027import org.nuxeo.lib.stream.log.LogLag;
028import org.nuxeo.lib.stream.log.LogManager;
029
030/**
031 * @since 9.3
032 */
033public class LagCommand extends Command {
034
035    protected static final String NAME = "lag";
036
037    protected boolean verbose = false;
038
039    @Override
040    public String name() {
041        return NAME;
042    }
043
044    @Override
045    public void updateOptions(Options options) {
046        options.addOption(
047                Option.builder("l").longOpt("log-name").desc("Log name").hasArg().argName("LOG_NAME").build());
048        options.addOption(Option.builder().longOpt("verbose").desc("Display lag for each partition").build());
049    }
050
051    @Override
052    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
053        String name = cmd.getOptionValue("log-name");
054        verbose = cmd.hasOption("verbose");
055        if (name != null) {
056            lag(manager, name);
057        } else {
058            lag(manager);
059        }
060        return true;
061    }
062
063    protected void lag(LogManager manager) {
064        System.out.println("# " + manager);
065        for (String name : manager.listAll()) {
066            lag(manager, name);
067        }
068    }
069
070    protected void lag(LogManager manager, String name) {
071        System.out.println("## Log: " + name + " partitions: " + manager.getAppender(name).size());
072        List<String> consumers = manager.listConsumerGroups(name);
073        if (verbose && consumers.isEmpty()) {
074            // add a fake group to get info on end positions
075            consumers.add("tools");
076        }
077        consumers.forEach(group -> renderLag(group, manager.getLagPerPartition(name, group)));
078    }
079
080    protected void renderLag(String group, List<LogLag> lags) {
081        System.out.println("### Group: " + group);
082        System.out.println(
083                "| partition | lag | pos | end | posOffset | endOffset |\n" + "| --- | ---: | ---: | ---: | ---: |");
084        LogLag all = LogLag.of(lags);
085        System.out.println(String.format("|All|%d|%d|%d|%d|%d|", all.lag(), all.lower(), all.upper(), all.lowerOffset(),
086                all.upperOffset()));
087        if (verbose && lags.size() > 1) {
088            AtomicInteger i = new AtomicInteger();
089            lags.forEach(lag -> System.out.println(String.format("|%s|%d|%d|%d|%d|%d|", i.getAndIncrement(), lag.lag(),
090                    lag.lower(), lag.upper(), lag.lowerOffset(), lag.upperOffset())));
091        }
092    }
093
094}