001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.time.Instant; 022import java.util.List; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.commons.cli.CommandLine; 027import org.apache.commons.cli.Option; 028import org.apache.commons.cli.Options; 029import org.nuxeo.lib.stream.computation.Record; 030import org.nuxeo.lib.stream.computation.Watermark; 031import org.nuxeo.lib.stream.log.Latency; 032import org.nuxeo.lib.stream.log.LogManager; 033 034/** 035 * @since 9.3 036 */ 037public class LatencyCommand extends Command { 038 039 protected static final String NAME = "latency"; 040 041 protected boolean verbose = false; 042 043 @Override 044 public String name() { 045 return NAME; 046 } 047 048 @Override 049 public void updateOptions(Options options) { 050 options.addOption(Option.builder("l") 051 .longOpt("log-name") 052 .desc("Log name of a stream containing computation.Record") 053 .hasArg() 054 .argName("LOG_NAME") 055 .build()); 056 options.addOption(Option.builder().longOpt("verbose").desc("Display latency for each partition").build()); 057 } 058 059 @Override 060 public boolean run(LogManager manager, CommandLine cmd) { 061 String name = cmd.getOptionValue("log-name"); 062 verbose = cmd.hasOption("verbose"); 063 if (name != null) { 064 latency(manager, name); 065 } else { 066 latency(manager); 067 } 068 return true; 069 } 070 071 protected void latency(LogManager manager) { 072 System.out.println("# " + manager); 073 for (String name : manager.listAll()) { 074 latency(manager, name); 075 } 076 } 077 078 protected void latency(LogManager manager, String name) { 079 System.out.println("## Log: " + name + " partitions: " + manager.getAppender(name).size()); 080 List<String> consumers = manager.listConsumerGroups(name); 081 if (verbose && consumers.isEmpty()) { 082 // add a fake group to get info on end positions 083 consumers.add("tools"); 084 } 085 try { 086 consumers.forEach(group -> renderLatency(group, manager.<Record> getLatencyPerPartition(name, group, 087 (rec -> Watermark.ofValue(rec.watermark).getTimestamp()), (rec -> rec.key)))); 088 } catch (IllegalStateException e) { 089 // happen when this is not a stream of Record 090 System.err.println(e.getMessage()); 091 } 092 } 093 094 protected void renderLatency(String group, List<Latency> latencies) { 095 System.out.println(String.format("### Group: %s", group)); 096 System.out.println( 097 "| partition | lag | latencyMs | latency | posTimestamp | posDate | curDate | pos | end | posOffset | endOffset | posKey |\n" 098 + "| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |"); 099 Latency all = Latency.of(latencies); 100 System.out.println(String.format("|All|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", all.lag().lag(), all.latency(), 101 formatInterval(all.latency()), all.lower(), formatDate(all.lower()), formatDate(all.upper()), 102 all.lag().lower(), all.lag().upper(), all.lag().lowerOffset(), all.lag().upperOffset(), all.key())); 103 if (verbose && latencies.size() > 1) { 104 AtomicInteger i = new AtomicInteger(); 105 latencies.forEach(lat -> System.out.println(String.format("|%d|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", 106 i.getAndIncrement(), lat.lag().lag(), lat.latency(), formatInterval(lat.latency()), lat.lower(), 107 formatDate(lat.lower()), formatDate(lat.upper()), lat.lag().lower(), lat.lag().upper(), 108 lat.lag().lowerOffset(), lat.lag().upperOffset(), lat.key()))); 109 } 110 } 111 112 protected String formatDate(long timestamp) { 113 if (timestamp > 0) { 114 return Instant.ofEpochMilli(timestamp).toString(); 115 } 116 return "NA"; 117 } 118 119 protected static String formatInterval(final long l) { 120 if (l == 0) { 121 return "NA"; 122 } 123 final long hr = TimeUnit.MILLISECONDS.toHours(l); 124 final long min = TimeUnit.MILLISECONDS.toMinutes(l - TimeUnit.HOURS.toMillis(hr)); 125 final long sec = TimeUnit.MILLISECONDS.toSeconds( 126 l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min)); 127 final long ms = TimeUnit.MILLISECONDS.toMillis( 128 l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min) - TimeUnit.SECONDS.toMillis(sec)); 129 return String.format("%02d:%02d:%02d.%03d", hr, min, sec, ms); 130 } 131}