001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.util.List; 022import java.util.concurrent.atomic.AtomicInteger; 023 024import org.apache.commons.cli.CommandLine; 025import org.apache.commons.cli.Option; 026import org.apache.commons.cli.Options; 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.lib.stream.log.LogLag; 030import org.nuxeo.lib.stream.log.LogManager; 031import org.nuxeo.lib.stream.log.Name; 032 033/** 034 * Display the current lags of consumers. 035 * 036 * @since 9.3 037 */ 038public class LagCommand extends Command { 039 private static final Log log = LogFactory.getLog(LagCommand.class); 040 041 protected static final String NAME = "lag"; 042 043 protected boolean verbose = false; 044 045 protected boolean quiet = false; 046 047 @Override 048 public String name() { 049 return NAME; 050 } 051 052 @Override 053 public void updateOptions(Options options) { 054 options.addOption( 055 Option.builder("l").longOpt("log-name").desc("Log name").hasArg().argName("LOG_NAME").build()); 056 options.addOption(Option.builder().longOpt("verbose").desc("Display lag for each partition").build()); 057 options.addOption(Option.builder("q").longOpt("quiet").desc("No output for consumer group without lag").build()); 058 } 059 060 @Override 061 public boolean run(LogManager manager, CommandLine cmd) { 062 String logName = cmd.getOptionValue("log-name"); 063 verbose = cmd.hasOption("verbose"); 064 quiet = cmd.hasOption("quiet"); 065 if (logName != null) { 066 lag(manager, Name.ofUrn(logName)); 067 } else { 068 lag(manager); 069 } 070 return true; 071 } 072 073 protected void lag(LogManager manager) { 074 log.info("# " + manager); 075 for (Name name : manager.listAllNames()) { 076 lag(manager, name); 077 } 078 } 079 080 protected void lag(LogManager manager, Name name) { 081 log.info("## Log: " + name + " partitions: " + manager.size(name)); 082 List<Name> consumers = manager.listConsumerGroups(name); 083 if (verbose && consumers.isEmpty()) { 084 // add a fake group to get info on end positions 085 consumers.add(Name.ofUrn("admin/tools")); 086 } 087 consumers.forEach(group -> renderLag(group, manager.getLagPerPartition(name, group))); 088 } 089 090 protected void renderLag(Name group, List<LogLag> lags) { 091 LogLag all = LogLag.of(lags); 092 if (quiet && all.lag() == 0) { 093 log.info("### Group: " + group + " no lag end: " + all.upper()); 094 return; 095 } 096 log.info("### Group: " + group); 097 log.info("| partition | lag | pos | end | posOffset | endOffset |\n" 098 + "| --- | ---: | ---: | ---: | ---: | ---: |"); 099 log.info(String.format("|All|%d|%d|%d|%d|%d|", all.lag(), all.lower(), all.upper(), all.lowerOffset(), 100 all.upperOffset())); 101 if (verbose && lags.size() > 1) { 102 AtomicInteger i = new AtomicInteger(); 103 lags.forEach(lag -> log.info(String.format("|%s|%d|%d|%d|%d|%d|", i.getAndIncrement(), lag.lag(), 104 lag.lower(), lag.upper(), lag.lowerOffset(), lag.upperOffset()))); 105 } 106 } 107 108}