001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.time.Instant;
022import java.util.List;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.commons.cli.CommandLine;
027import org.apache.commons.cli.Option;
028import org.apache.commons.cli.Options;
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.lib.stream.codec.Codec;
032import org.nuxeo.lib.stream.computation.Record;
033import org.nuxeo.lib.stream.computation.Watermark;
034import org.nuxeo.lib.stream.log.Latency;
035import org.nuxeo.lib.stream.log.LogManager;
036import org.nuxeo.lib.stream.log.Name;
037
038/**
039 * Display the current latencies of consumers.
040 *
041 * @since 9.3
042 */
043public class LatencyCommand extends Command {
044    private static final Log log = LogFactory.getLog(LatencyCommand.class);
045
046    protected static final String NAME = "latency";
047
048    protected boolean verbose = false;
049
050    @Override
051    public String name() {
052        return NAME;
053    }
054
055    @Override
056    public void updateOptions(Options options) {
057        options.addOption(Option.builder("l")
058                                .longOpt("log-name")
059                                .desc("Log name of a stream containing computation.Record")
060                                .hasArg()
061                                .argName("LOG_NAME")
062                                .build());
063        options.addOption(Option.builder()
064                                .longOpt("codec")
065                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
066                                .hasArg()
067                                .argName("CODEC")
068                                .build());
069        options.addOption(Option.builder().longOpt("verbose").desc("Display latency for each partition").build());
070    }
071
072    @Override
073    public boolean run(LogManager manager, CommandLine cmd) {
074        String logName = cmd.getOptionValue("log-name");
075        Codec<Record> codec = getRecordCodec(cmd.getOptionValue("codec"));
076        verbose = cmd.hasOption("verbose");
077        if (logName != null) {
078            latency(manager, Name.ofUrn(logName), codec);
079        } else {
080            latency(manager, codec);
081        }
082        return true;
083    }
084
085    protected void latency(LogManager manager, Codec<Record> codec) {
086        log.info("# " + manager);
087        for (Name name : manager.listAllNames()) {
088            latency(manager, name, codec);
089        }
090    }
091
092    protected void latency(LogManager manager, Name name, Codec<Record> codec) {
093        log.info("## Log: " + name + " partitions: " + manager.size(name));
094        List<Name> consumers = manager.listConsumerGroups(name);
095        if (verbose && consumers.isEmpty()) {
096            // add a fake group to get info on end positions
097            consumers.add(Name.ofUrn("admin/tools"));
098        }
099        try {
100            consumers.forEach(group -> renderLatency(group, manager.<Record> getLatencyPerPartition(name, group, codec,
101                    (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey))));
102        } catch (IllegalStateException e) {
103            // happen when this is not a stream of Record
104            log.error(e.getMessage());
105        }
106    }
107
108    protected void renderLatency(Name group, List<Latency> latencies) {
109        log.info(String.format("### Group: %s", group));
110        log.info(
111                "| partition | lag | latencyMs | latency | posTimestamp | posDate | curDate | pos | end | posOffset | endOffset | posKey |\n"
112                        + "| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |");
113        Latency all = Latency.of(latencies);
114        log.info(String.format("|All|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", all.lag().lag(), all.latency(),
115                formatInterval(all.latency()), all.lower(), formatDate(all.lower()), formatDate(all.upper()),
116                all.lag().lower(), all.lag().upper(), all.lag().lowerOffset(), all.lag().upperOffset(), all.key()));
117        if (verbose && latencies.size() > 1) {
118            AtomicInteger i = new AtomicInteger();
119            latencies.forEach(lat -> log.info(String.format("|%d|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|",
120                    i.getAndIncrement(), lat.lag().lag(), lat.latency(), formatInterval(lat.latency()), lat.lower(),
121                    formatDate(lat.lower()), formatDate(lat.upper()), lat.lag().lower(), lat.lag().upper(),
122                    lat.lag().lowerOffset(), lat.lag().upperOffset(), lat.key())));
123        }
124    }
125
126    protected String formatDate(long timestamp) {
127        if (timestamp > 0) {
128            return Instant.ofEpochMilli(timestamp).toString();
129        }
130        return "NA";
131    }
132
133    protected static String formatInterval(final long l) {
134        if (l == 0) {
135            return "NA";
136        }
137        final long hr = TimeUnit.MILLISECONDS.toHours(l);
138        final long min = TimeUnit.MILLISECONDS.toMinutes(l - TimeUnit.HOURS.toMillis(hr));
139        final long sec = TimeUnit.MILLISECONDS.toSeconds(
140                l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min));
141        final long ms = TimeUnit.MILLISECONDS.toMillis(
142                l - TimeUnit.HOURS.toMillis(hr) - TimeUnit.MINUTES.toMillis(min) - TimeUnit.SECONDS.toMillis(sec));
143        return String.format("%02d:%02d:%02d.%03d", hr, min, sec, ms);
144    }
145}