001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.util.List; 022import java.util.concurrent.atomic.AtomicInteger; 023 024import org.apache.commons.cli.CommandLine; 025import org.apache.commons.cli.Option; 026import org.apache.commons.cli.Options; 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.lib.stream.log.LogLag; 030import org.nuxeo.lib.stream.log.LogManager; 031 032/** 033 * Display the current lags of consumers. 034 * 035 * @since 9.3 036 */ 037public class LagCommand extends Command { 038 private static final Log log = LogFactory.getLog(LagCommand.class); 039 040 protected static final String NAME = "lag"; 041 042 protected boolean verbose = false; 043 044 @Override 045 public String name() { 046 return NAME; 047 } 048 049 @Override 050 public void updateOptions(Options options) { 051 options.addOption( 052 Option.builder("l").longOpt("log-name").desc("Log name").hasArg().argName("LOG_NAME").build()); 053 options.addOption(Option.builder().longOpt("verbose").desc("Display lag for each partition").build()); 054 } 055 056 @Override 057 public boolean run(LogManager manager, CommandLine cmd) { 058 String name = cmd.getOptionValue("log-name"); 059 verbose = cmd.hasOption("verbose"); 060 if (name != null) { 061 lag(manager, name); 062 } else { 063 lag(manager); 064 } 065 return true; 066 } 067 068 protected void lag(LogManager manager) { 069 log.info("# " + manager); 070 for (String name : manager.listAll()) { 071 lag(manager, name); 072 } 073 } 074 075 protected void lag(LogManager manager, String name) { 076 log.info("## Log: " + name + " partitions: " + manager.size(name)); 077 List<String> consumers = manager.listConsumerGroups(name); 078 if (verbose && consumers.isEmpty()) { 079 // add a fake group to get info on end positions 080 consumers.add("tools"); 081 } 082 consumers.forEach(group -> renderLag(group, manager.getLagPerPartition(name, group))); 083 } 084 085 protected void renderLag(String group, List<LogLag> lags) { 086 log.info("### Group: " + group); 087 log.info("| partition | lag | pos | end | posOffset | endOffset |\n" 088 + "| --- | ---: | ---: | ---: | ---: | ---: |"); 089 LogLag all = LogLag.of(lags); 090 log.info(String.format("|All|%d|%d|%d|%d|%d|", all.lag(), all.lower(), all.upper(), all.lowerOffset(), 091 all.upperOffset())); 092 if (verbose && lags.size() > 1) { 093 AtomicInteger i = new AtomicInteger(); 094 lags.forEach(lag -> log.info(String.format("|%s|%d|%d|%d|%d|%d|", i.getAndIncrement(), lag.lag(), 095 lag.lower(), lag.upper(), lag.lowerOffset(), lag.upperOffset()))); 096 } 097 } 098 099}