001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.nio.charset.StandardCharsets;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.lib.stream.codec.Codec;
030import org.nuxeo.lib.stream.computation.AbstractComputation;
031import org.nuxeo.lib.stream.computation.ComputationContext;
032import org.nuxeo.lib.stream.computation.Record;
033import org.nuxeo.lib.stream.computation.Watermark;
034import org.nuxeo.lib.stream.log.Latency;
035import org.nuxeo.lib.stream.log.LogManager;
036import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
037
038/**
039 * A computation that sends periodically latencies information into a Log.
040 *
041 * @since 10.1
042 */
043public class LatencyTrackerComputation extends AbstractComputation {
044    private static final Log log = LogFactory.getLog(LatencyTrackerComputation.class);
045
046    protected static final String OUTPUT_STREAM = "o1";
047
048    protected final LogManager manager;
049
050    protected final List<String> logNames;
051
052    protected final int intervalMs;
053
054    protected final int count;
055
056    protected final boolean verbose;
057
058    protected final Codec<Record> codec;
059
060    protected int remaining;
061
062    protected List<LogPartitionGroup> logGroups;
063
064    public LatencyTrackerComputation(LogManager manager, List<String> logNames, String computationName,
065            int intervalSecond, int count, boolean verbose, Codec<Record> codec) {
066        super(computationName, 1, 1);
067        this.manager = manager;
068        this.logNames = logNames;
069        this.intervalMs = 1000 * intervalSecond;
070        this.count = count;
071        this.remaining = count;
072        this.verbose = verbose;
073        this.codec = codec;
074    }
075
076    @Override
077    public void init(ComputationContext context) {
078        info(String.format("Tracking %s, count: %d, interval: %dms", Arrays.toString(logNames.toArray()), count,
079                intervalMs));
080        logGroups = new ArrayList<>();
081        logNames.forEach(name -> {
082            for (String group : manager.listConsumerGroups(name)) {
083                logGroups.add(new LogPartitionGroup(group, name, 0));
084            }
085        });
086        context.setTimer("tracker", System.currentTimeMillis() + intervalMs);
087    }
088
089    @Override
090    public void processTimer(ComputationContext context, String key, long timestamp) {
091        if (remaining == 0) {
092            debug("Exiting after " + count + " captures");
093            context.askForTermination();
094            return;
095        }
096        debug(String.format("Tracking latency %d/%d", count - remaining, count));
097        for (LogPartitionGroup logGroup : logGroups) {
098            List<Latency> latencies = getLatenciesForPartition(logGroup, codec);
099            if (latencies.isEmpty()) {
100                continue;
101            }
102            for (int partition = 0; partition < latencies.size(); partition++) {
103                Latency latency = latencies.get(partition);
104                if (latency.lower() <= 0) {
105                    // lower is the watermark timestamp for the latest processed record, without this info we cannot do
106                    // anything
107                    continue;
108                }
109                // upper is the time when the latency has been measured it is used as the watermark
110                long recordWatermark = Watermark.ofTimestamp(latency.upper()).getValue();
111                String recordKey = encodeKey(logGroup, partition);
112                byte[] recordValue = encodeLatency(latency);
113                Record record = new Record(recordKey, recordValue, recordWatermark);
114                if (verbose) {
115                    debug("out: " + record);
116                }
117                context.produceRecord(OUTPUT_STREAM, record);
118                context.setSourceLowWatermark(recordWatermark);
119            }
120        }
121        context.askForCheckpoint();
122        context.setTimer("tracker", System.currentTimeMillis() + intervalMs);
123        remaining--;
124    }
125
126    protected byte[] encodeLatency(Latency latency) {
127        return latency.asJson().getBytes(StandardCharsets.UTF_8);
128    }
129
130    @SuppressWarnings("squid:S1193")
131    protected List<Latency> getLatenciesForPartition(LogPartitionGroup logGroup, Codec<Record> codec) {
132        try {
133            return manager.getLatencyPerPartition(logGroup.name, logGroup.group, codec,
134                    (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey));
135        } catch (Exception e) {
136            if (e.getCause() instanceof ClassNotFoundException || e instanceof IllegalStateException) {
137                error("log does not contains Record, remove partition: " + logGroup);
138                return Collections.emptyList();
139            }
140            throw e;
141        }
142    }
143
144    public static String encodeKey(LogPartitionGroup logGroup, int partition) {
145        return String.format("%s:%s:%s", logGroup.group, logGroup.name, partition);
146    }
147
148    public static LogPartitionGroup decodeKey(String key) {
149        String[] parts = key.split(":");
150        return new LogPartitionGroup(parts[0], parts[1], Integer.parseInt(parts[2]));
151    }
152
153    @Override
154    public void destroy() {
155        info("Good bye");
156    }
157
158    @Override
159    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
160        error("Receiving a record is not expected!: " + record);
161    }
162
163    protected void debug(String msg) {
164        if (verbose) {
165            log.info(msg);
166        }
167    }
168
169    protected void info(String msg) {
170        log.info(msg);
171    }
172
173    protected void error(String msg) {
174        log.error(msg);
175    }
176}