001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.time.Instant;
022import java.util.List;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.commons.cli.CommandLine;
027import org.apache.commons.cli.Option;
028import org.apache.commons.cli.Options;
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.lib.stream.codec.Codec;
032import org.nuxeo.lib.stream.computation.Record;
033import org.nuxeo.lib.stream.computation.Watermark;
034import org.nuxeo.lib.stream.log.Latency;
035import org.nuxeo.lib.stream.log.LogManager;
036
037/**
038 * Display the current latencies of consumers.
039 *
040 * @since 9.3
041 */
042public class LatencyCommand extends Command {
043    private static final Log log = LogFactory.getLog(LatencyCommand.class);
044
045    protected static final String NAME = "latency";
046
047    protected boolean verbose = false;
048
049    @Override
050    public String name() {
051        return NAME;
052    }
053
054    @Override
055    public void updateOptions(Options options) {
056        options.addOption(Option.builder("l")
057                                .longOpt("log-name")
058                                .desc("Log name of a stream containing computation.Record")
059                                .hasArg()
060                                .argName("LOG_NAME")
061                                .build());
062        options.addOption(Option.builder()
063                                .longOpt("codec")
064                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
065                                .hasArg()
066                                .argName("CODEC")
067                                .build());
068        options.addOption(Option.builder().longOpt("verbose").desc("Display latency for each partition").build());
069    }
070
071    @Override
072    public boolean run(LogManager manager, CommandLine cmd) {
073        String name = cmd.getOptionValue("log-name");
074        Codec<Record> codec = getRecordCodec(cmd.getOptionValue("codec"));
075        verbose = cmd.hasOption("verbose");
076        if (name != null) {
077            latency(manager, name, codec);
078        } else {
079            latency(manager, codec);
080        }
081        return true;
082    }
083
084    protected void latency(LogManager manager, Codec<Record> codec) {
085        log.info("# " + manager);
086        for (String name : manager.listAll()) {
087            latency(manager, name, codec);
088        }
089    }
090
091    protected void latency(LogManager manager, String name, Codec<Record> codec) {
092        log.info("## Log: " + name + " partitions: " + manager.size(name));
093        List<String> consumers = manager.listConsumerGroups(name);
094        if (verbose && consumers.isEmpty()) {
095            // add a fake group to get info on end positions
096            consumers.add("tools");
097        }
098        try {
099            consumers.forEach(group -> renderLatency(group, manager.<Record> getLatencyPerPartition(name, group, codec,
100                    (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey))));
101        } catch (IllegalStateException e) {
102            // happen when this is not a stream of Record
103            log.error(e.getMessage());
104        }
105    }
106
107    protected void renderLatency(String group, List<Latency> latencies) {
108        log.info(String.format("### Group: %s", group));
109        log.info(
110                "| partition | lag | latencyMs | latency | posTimestamp | posDate | curDate | pos | end | posOffset | endOffset | posKey |\n"
111                        + "| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |");
112        Latency all = Latency.of(latencies);
113        log.info(String.format("|All|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", all.lag().lag(), all.latency(),
114                formatInterval(all.latency()), all.lower(), formatDate(all.lower()), formatDate(all.upper()),
115                all.lag().lower(), all.lag().upper(), all.lag().lowerOffset(), all.lag().upperOffset(), all.key()));
116        if (verbose && latencies.size() > 1) {
117            AtomicInteger i = new AtomicInteger();
118            latencies.forEach(lat -> log.info(String.format("|%d|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|",
119                    i.getAndIncrement(), lat.lag().lag(), lat.latency(), formatInterval(lat.latency()), lat.lower(),
120                    formatDate(lat.lower()), formatDate(lat.upper()), lat.lag().lower(), lat.lag().upper(),
121                    lat.lag().lowerOffset(), lat.lag().upperOffset(), lat.key())));
122        }
123    }
124
125    protected String formatDate(long timestamp) {
126        if (timestamp > 0) {
127            return Instant.ofEpochMilli(timestamp).toString();
128        }
129        return "NA";
130    }
131
132    protected static String formatInterval(final long l) {
133        if (l == 0) {
134            return "NA";
135        }
136        final long hr = TimeUnit.MILLISECONDS.toHours(l);
137        final long min = TimeUnit.MILLISECONDS.toMinutes(l - TimeUnit.HOURS.toMillis(hr));
138        final long sec = TimeUnit.MILLISECONDS.toSeconds(
139                l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min));
140        final long ms = TimeUnit.MILLISECONDS.toMillis(
141                l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min) - TimeUnit.SECONDS.toMillis(sec));
142        return String.format("%02d:%02d:%02d.%03d", hr, min, sec, ms);
143    }
144}