001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.time.Instant; 022import java.util.List; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.commons.cli.CommandLine; 027import org.apache.commons.cli.Option; 028import org.apache.commons.cli.Options; 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.lib.stream.codec.Codec; 032import org.nuxeo.lib.stream.computation.Record; 033import org.nuxeo.lib.stream.computation.Watermark; 034import org.nuxeo.lib.stream.log.Latency; 035import org.nuxeo.lib.stream.log.LogManager; 036 037/** 038 * Display the current latencies of consumers. 039 * 040 * @since 9.3 041 */ 042public class LatencyCommand extends Command { 043 private static final Log log = LogFactory.getLog(LatencyCommand.class); 044 045 protected static final String NAME = "latency"; 046 047 protected boolean verbose = false; 048 049 @Override 050 public String name() { 051 return NAME; 052 } 053 054 @Override 055 public void updateOptions(Options options) { 056 options.addOption(Option.builder("l") 057 .longOpt("log-name") 058 .desc("Log name of a stream containing computation.Record") 059 .hasArg() 060 .argName("LOG_NAME") 061 .build()); 062 options.addOption(Option.builder() 063 .longOpt("codec") 064 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 065 .hasArg() 066 .argName("CODEC") 067 .build()); 068 options.addOption(Option.builder().longOpt("verbose").desc("Display latency for each partition").build()); 069 } 070 071 @Override 072 public boolean run(LogManager manager, CommandLine cmd) { 073 String name = cmd.getOptionValue("log-name"); 074 Codec<Record> codec = getRecordCodec(cmd.getOptionValue("codec")); 075 verbose = cmd.hasOption("verbose"); 076 if (name != null) { 077 latency(manager, name, codec); 078 } else { 079 latency(manager, codec); 080 } 081 return true; 082 } 083 084 protected void latency(LogManager manager, Codec<Record> codec) { 085 log.info("# " + manager); 086 for (String name : manager.listAll()) { 087 latency(manager, name, codec); 088 } 089 } 090 091 protected void latency(LogManager manager, String name, Codec<Record> codec) { 092 log.info("## Log: " + name + " partitions: " + manager.size(name)); 093 List<String> consumers = manager.listConsumerGroups(name); 094 if (verbose && consumers.isEmpty()) { 095 // add a fake group to get info on end positions 096 consumers.add("tools"); 097 } 098 try { 099 consumers.forEach(group -> renderLatency(group, manager.<Record> getLatencyPerPartition(name, group, codec, 100 (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey)))); 101 } catch (IllegalStateException e) { 102 // happen when this is not a stream of Record 103 log.error(e.getMessage()); 104 } 105 } 106 107 protected void renderLatency(String group, List<Latency> latencies) { 108 log.info(String.format("### Group: %s", group)); 109 log.info( 110 "| partition | lag | latencyMs | latency | posTimestamp | posDate | curDate | pos | end | posOffset | endOffset | posKey |\n" 111 + "| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |"); 112 Latency all = Latency.of(latencies); 113 log.info(String.format("|All|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", all.lag().lag(), all.latency(), 114 formatInterval(all.latency()), all.lower(), formatDate(all.lower()), formatDate(all.upper()), 115 all.lag().lower(), all.lag().upper(), all.lag().lowerOffset(), all.lag().upperOffset(), all.key())); 116 if (verbose && latencies.size() > 1) { 117 AtomicInteger i = new AtomicInteger(); 118 latencies.forEach(lat -> log.info(String.format("|%d|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", 119 i.getAndIncrement(), lat.lag().lag(), lat.latency(), formatInterval(lat.latency()), lat.lower(), 120 formatDate(lat.lower()), formatDate(lat.upper()), lat.lag().lower(), lat.lag().upper(), 121 lat.lag().lowerOffset(), lat.lag().upperOffset(), lat.key()))); 122 } 123 } 124 125 protected String formatDate(long timestamp) { 126 if (timestamp > 0) { 127 return Instant.ofEpochMilli(timestamp).toString(); 128 } 129 return "NA"; 130 } 131 132 protected static String formatInterval(final long l) { 133 if (l == 0) { 134 return "NA"; 135 } 136 final long hr = TimeUnit.MILLISECONDS.toHours(l); 137 final long min = TimeUnit.MILLISECONDS.toMinutes(l - TimeUnit.HOURS.toMillis(hr)); 138 final long sec = TimeUnit.MILLISECONDS.toSeconds( 139 l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min)); 140 final long ms = TimeUnit.MILLISECONDS.toMillis( 141 l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min) - TimeUnit.SECONDS.toMillis(sec)); 142 return String.format("%02d:%02d:%02d.%03d", hr, min, sec, ms); 143 } 144}