001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.util.List; 022import java.util.concurrent.atomic.AtomicInteger; 023 024import org.apache.commons.cli.CommandLine; 025import org.apache.commons.cli.Option; 026import org.apache.commons.cli.Options; 027import org.nuxeo.lib.stream.log.LogLag; 028import org.nuxeo.lib.stream.log.LogManager; 029 030/** 031 * @since 9.3 032 */ 033public class LagCommand extends Command { 034 035 protected static final String NAME = "lag"; 036 037 protected boolean verbose = false; 038 039 @Override 040 public String name() { 041 return NAME; 042 } 043 044 @Override 045 public void updateOptions(Options options) { 046 options.addOption( 047 Option.builder("l").longOpt("log-name").desc("Log name").hasArg().argName("LOG_NAME").build()); 048 options.addOption(Option.builder().longOpt("verbose").desc("Display lag for each partition").build()); 049 } 050 051 @Override 052 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 053 String name = cmd.getOptionValue("log-name"); 054 verbose = cmd.hasOption("verbose"); 055 if (name != null) { 056 lag(manager, name); 057 } else { 058 lag(manager); 059 } 060 return true; 061 } 062 063 protected void lag(LogManager manager) { 064 System.out.println("# " + manager); 065 for (String name : manager.listAll()) { 066 lag(manager, name); 067 } 068 } 069 070 protected void lag(LogManager manager, String name) { 071 System.out.println("## Log: " + name + " partitions: " + manager.getAppender(name).size()); 072 List<String> consumers = manager.listConsumerGroups(name); 073 if (verbose && consumers.isEmpty()) { 074 // add a fake group to get info on end positions 075 consumers.add("tools"); 076 } 077 consumers.forEach(group -> renderLag(group, manager.getLagPerPartition(name, group))); 078 } 079 080 protected void renderLag(String group, List<LogLag> lags) { 081 System.out.println("### Group: " + group); 082 System.out.println( 083 "| partition | lag | pos | end | posOffset | endOffset |\n" + "| --- | ---: | ---: | ---: | ---: |"); 084 LogLag all = LogLag.of(lags); 085 System.out.println(String.format("|All|%d|%d|%d|%d|%d|", all.lag(), all.lower(), all.upper(), all.lowerOffset(), 086 all.upperOffset())); 087 if (verbose && lags.size() > 1) { 088 AtomicInteger i = new AtomicInteger(); 089 lags.forEach(lag -> System.out.println(String.format("|%s|%d|%d|%d|%d|%d|", i.getAndIncrement(), lag.lag(), 090 lag.lower(), lag.upper(), lag.lowerOffset(), lag.upperOffset()))); 091 } 092 } 093 094}