001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.time.Instant; 022import java.util.List; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.commons.cli.CommandLine; 027import org.apache.commons.cli.Option; 028import org.apache.commons.cli.Options; 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.lib.stream.codec.Codec; 032import org.nuxeo.lib.stream.computation.Record; 033import org.nuxeo.lib.stream.computation.Watermark; 034import org.nuxeo.lib.stream.log.Latency; 035import org.nuxeo.lib.stream.log.LogManager; 036import org.nuxeo.lib.stream.log.Name; 037 038/** 039 * Display the current latencies of consumers. 040 * 041 * @since 9.3 042 */ 043public class LatencyCommand extends Command { 044 private static final Log log = LogFactory.getLog(LatencyCommand.class); 045 046 protected static final String NAME = "latency"; 047 048 protected boolean verbose = false; 049 050 @Override 051 public String name() { 052 return NAME; 053 } 054 055 @Override 056 public void updateOptions(Options options) { 057 options.addOption(Option.builder("l") 058 .longOpt("log-name") 059 .desc("Log name of a stream containing computation.Record") 060 .hasArg() 061 .argName("LOG_NAME") 062 .build()); 063 options.addOption(Option.builder() 064 .longOpt("codec") 065 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 066 .hasArg() 067 .argName("CODEC") 068 .build()); 069 options.addOption(Option.builder().longOpt("verbose").desc("Display latency for each partition").build()); 070 } 071 072 @Override 073 public boolean run(LogManager manager, CommandLine cmd) { 074 String logName = cmd.getOptionValue("log-name"); 075 Codec<Record> codec = getRecordCodec(cmd.getOptionValue("codec")); 076 verbose = cmd.hasOption("verbose"); 077 if (logName != null) { 078 latency(manager, Name.ofUrn(logName), codec); 079 } else { 080 latency(manager, codec); 081 } 082 return true; 083 } 084 085 protected void latency(LogManager manager, Codec<Record> codec) { 086 log.info("# " + manager); 087 for (Name name : manager.listAllNames()) { 088 latency(manager, name, codec); 089 } 090 } 091 092 protected void latency(LogManager manager, Name name, Codec<Record> codec) { 093 log.info("## Log: " + name + " partitions: " + manager.size(name)); 094 List<Name> consumers = manager.listConsumerGroups(name); 095 if (verbose && consumers.isEmpty()) { 096 // add a fake group to get info on end positions 097 consumers.add(Name.ofUrn("admin/tools")); 098 } 099 try { 100 consumers.forEach(group -> renderLatency(group, manager.<Record> getLatencyPerPartition(name, group, codec, 101 (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey)))); 102 } catch (IllegalStateException e) { 103 // happen when this is not a stream of Record 104 log.error(e.getMessage()); 105 } 106 } 107 108 protected void renderLatency(Name group, List<Latency> latencies) { 109 log.info(String.format("### Group: %s", group)); 110 log.info( 111 "| partition | lag | latencyMs | latency | posTimestamp | posDate | curDate | pos | end | posOffset | endOffset | posKey |\n" 112 + "| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |"); 113 Latency all = Latency.of(latencies); 114 log.info(String.format("|All|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", all.lag().lag(), all.latency(), 115 formatInterval(all.latency()), all.lower(), formatDate(all.lower()), formatDate(all.upper()), 116 all.lag().lower(), all.lag().upper(), all.lag().lowerOffset(), all.lag().upperOffset(), all.key())); 117 if (verbose && latencies.size() > 1) { 118 AtomicInteger i = new AtomicInteger(); 119 latencies.forEach(lat -> log.info(String.format("|%d|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", 120 i.getAndIncrement(), lat.lag().lag(), lat.latency(), formatInterval(lat.latency()), lat.lower(), 121 formatDate(lat.lower()), formatDate(lat.upper()), lat.lag().lower(), lat.lag().upper(), 122 lat.lag().lowerOffset(), lat.lag().upperOffset(), lat.key()))); 123 } 124 } 125 126 protected String formatDate(long timestamp) { 127 if (timestamp > 0) { 128 return Instant.ofEpochMilli(timestamp).toString(); 129 } 130 return "NA"; 131 } 132 133 protected static String formatInterval(final long l) { 134 if (l == 0) { 135 return "NA"; 136 } 137 final long hr = TimeUnit.MILLISECONDS.toHours(l); 138 final long min = TimeUnit.MILLISECONDS.toMinutes(l - TimeUnit.HOURS.toMillis(hr)); 139 final long sec = TimeUnit.MILLISECONDS.toSeconds( 140 l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min)); 141 final long ms = TimeUnit.MILLISECONDS.toMillis( 142 l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min) - TimeUnit.SECONDS.toMillis(sec)); 143 return String.format("%02d:%02d:%02d.%03d", hr, min, sec, ms); 144 } 145}