001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.UnsupportedEncodingException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.List;
025
026import org.nuxeo.lib.stream.computation.AbstractComputation;
027import org.nuxeo.lib.stream.computation.ComputationContext;
028import org.nuxeo.lib.stream.computation.Record;
029import org.nuxeo.lib.stream.computation.Watermark;
030import org.nuxeo.lib.stream.log.Latency;
031import org.nuxeo.lib.stream.log.LogManager;
032import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
033
034/**
035 * A computation that sends periodically latencies information.
036 *
037 * @since 10.1
038 */
039public class LatencyTrackerComputation extends AbstractComputation {
040
041    protected static final String OUTPUT_STREAM = "o1";
042
043    protected final LogManager manager;
044
045    protected final List<String> logNames;
046
047    protected final int intervalMs;
048
049    protected final int count;
050
051    protected final boolean verbose;
052
053    protected int remaining;
054
055    protected List<LogPartitionGroup> logGroups;
056
057    public LatencyTrackerComputation(LogManager manager, List<String> logNames, String computationName,
058            int intervalSecond, int count, boolean verbose) {
059        super(computationName, 1, 1);
060        this.manager = manager;
061        this.logNames = logNames;
062        this.intervalMs = 1000 * intervalSecond;
063        this.count = count;
064        this.remaining = count;
065        this.verbose = verbose;
066    }
067
068    @Override
069    public void init(ComputationContext context) {
070        info(String.format("Tracking %s, count: %d, interval: %dms", Arrays.toString(logNames.toArray()), count,
071                intervalMs));
072        logGroups = new ArrayList<>();
073        logNames.forEach(name -> {
074            for (String group : manager.listConsumerGroups(name)) {
075                logGroups.add(new LogPartitionGroup(group, name, 0));
076            }
077        });
078        context.setTimer("tracker", System.currentTimeMillis() + intervalMs);
079    }
080
081    @Override
082    public void processTimer(ComputationContext context, String key, long timestamp) {
083        if (remaining == 0) {
084            debug("Exiting after " + count + " captures");
085            context.askForTermination();
086            return;
087        }
088        debug(String.format("Tracking latency %d/%d", count - remaining, count));
089        for (LogPartitionGroup logGroup : logGroups) {
090            List<Latency> latencies;
091            try {
092                latencies = manager.<Record> getLatencyPerPartition(logGroup.name, logGroup.group,
093                        (rec -> Watermark.ofValue(rec.watermark).getTimestamp()), (rec -> rec.key));
094            } catch (IllegalStateException e) {
095                error("log does not contains Record: " + logGroup);
096                continue;
097            }
098            int partition = 0;
099            for (Latency latency : latencies) {
100                String recordKey = encodeKey(logGroup, partition);
101                byte[] value;
102                try {
103                    value = latency.asJson().getBytes("UTF-8");
104                } catch (UnsupportedEncodingException e) {
105                    throw new IllegalStateException("Faild to byte encoding " + latency, e);
106                }
107                Record record = new Record(recordKey, value, Watermark.ofTimestamp(latency.upper()).getValue(), null);
108                debug("out: " + record);
109                context.produceRecord(OUTPUT_STREAM, record);
110                context.setSourceLowWatermark(latency.upper());
111                partition++;
112            }
113        }
114        context.askForCheckpoint();
115        context.setTimer("tracker", System.currentTimeMillis() + intervalMs);
116        remaining--;
117    }
118
119    public static String encodeKey(LogPartitionGroup logGroup, int partition) {
120        return String.format("%s:%s:%s", logGroup.group, logGroup.name, partition);
121    }
122
123    public static LogPartitionGroup decodeKey(String key) {
124        String[] parts = key.split(":");
125        return new LogPartitionGroup(parts[0], parts[1], Integer.parseInt(parts[2]));
126    }
127
128    @Override
129    public void destroy() {
130        info("Good bye");
131    }
132
133    @Override
134    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
135        error("Receiving a record is not expected!: " + record);
136    }
137
138    protected void debug(String msg) {
139        if (verbose) {
140            System.out.println(msg);
141        }
142    }
143
144    protected void info(String msg) {
145        System.out.println(msg);
146    }
147
148    protected void error(String msg) {
149        System.err.println(msg);
150    }
151}