001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.nio.charset.StandardCharsets;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.List;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.lib.stream.codec.Codec;
029import org.nuxeo.lib.stream.computation.AbstractComputation;
030import org.nuxeo.lib.stream.computation.ComputationContext;
031import org.nuxeo.lib.stream.computation.Record;
032import org.nuxeo.lib.stream.computation.Watermark;
033import org.nuxeo.lib.stream.log.Latency;
034import org.nuxeo.lib.stream.log.LogManager;
035import org.nuxeo.lib.stream.log.Name;
036import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
037
038/**
039 * A computation that sends periodically latencies information into a Log.
040 *
041 * @since 10.1
042 */
043public class LatencyTrackerComputation extends AbstractComputation {
044    private static final Log log = LogFactory.getLog(LatencyTrackerComputation.class);
045
046    protected static final String OUTPUT_STREAM = "o1";
047
048    protected final LogManager manager;
049
050    protected final List<Name> logNames;
051
052    protected final int intervalMs;
053
054    protected final int count;
055
056    protected final boolean verbose;
057
058    protected final Codec<Record> codec;
059
060    protected int remaining;
061
062    protected final List<LogPartitionGroup> logGroups = new ArrayList<>();
063
064    protected int refreshGroupCounter;
065
066    public LatencyTrackerComputation(LogManager manager, List<Name> logNames, String computationName,
067            int intervalSecond, int count, boolean verbose, Codec<Record> codec, int outputStream) {
068        super(computationName, 1, outputStream);
069        this.manager = manager;
070        this.logNames = logNames;
071        this.intervalMs = 1000 * intervalSecond;
072        this.count = count;
073        this.remaining = count;
074        this.verbose = verbose;
075        this.codec = codec;
076    }
077
078    @Override
079    public void init(ComputationContext context) {
080        log.info(String.format("Tracking %d streams: %s, count: %d, interval: %dms", logNames.size(),
081                Arrays.toString(logNames.toArray()), count,
082                intervalMs));
083        context.setTimer("tracker", System.currentTimeMillis() + intervalMs);
084    }
085
086    @Override
087    public void processTimer(ComputationContext context, String key, long timestamp) {
088        if (remaining == 0) {
089            if (verbose) {
090                log.info("Exiting after " + count + " captures");
091            }
092            context.askForTermination();
093            return;
094        }
095        if (verbose) {
096            log.info(String.format("Tracking latency %d/%d", count - remaining, count));
097        }
098        List<LogPartitionGroup> toRemove = new ArrayList<>();
099        for (LogPartitionGroup logGroup : getLogGroup()) {
100            try {
101                List<Latency> latencies = manager.getLatencyPerPartition(logGroup.name, logGroup.group, codec,
102                        (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey));
103                if (!latencies.isEmpty()) {
104                    processLatencies(context, logGroup, latencies);
105                }
106            } catch (Exception e) {
107                if (e.getCause() instanceof ClassNotFoundException || e.getCause() instanceof ClassCastException
108                        || e instanceof IllegalStateException || e instanceof IllegalArgumentException) {
109                    log.warn("log does not contains computation Record, removing partition: " + logGroup);
110                    toRemove.add(logGroup);
111                    continue;
112                }
113                throw e;
114            }
115        }
116        context.askForCheckpoint();
117        context.setTimer("tracker", System.currentTimeMillis() + intervalMs);
118        remaining--;
119        if (!toRemove.isEmpty()) {
120            logGroups.removeAll(toRemove);
121            if (logGroups.isEmpty()) {
122                log.error("Exiting because all logs have been skipped");
123                context.askForTermination();
124            }
125        }
126    }
127
128    protected List<LogPartitionGroup> getLogGroup() {
129        if (logGroups.isEmpty() || refreshGroup()) {
130            logGroups.clear();
131            logNames.forEach(name -> {
132                for (Name group : manager.listConsumerGroups(name)) {
133                    logGroups.add(new LogPartitionGroup(group, name, 0));
134                }
135            });
136            if (verbose) {
137                log.info("Update list of consumers: " + Arrays.toString(logGroups.toArray()));
138            }
139        }
140        return logGroups;
141    }
142
143    protected boolean refreshGroup() {
144        refreshGroupCounter += 1;
145        return (refreshGroupCounter % 5) == 0;
146    }
147
148    protected void processLatencies(ComputationContext context, LogPartitionGroup logGroup, List<Latency> latencies) {
149        for (int partition = 0; partition < latencies.size(); partition++) {
150            Latency latency = latencies.get(partition);
151            if (latency.lower() <= 0) {
152                // lower is the watermark timestamp for the latest processed record, without this info we cannot do
153                // anything
154                continue;
155            }
156            // upper is the time when the latency has been measured it is used as the watermark
157            long recordWatermark = Watermark.ofTimestamp(latency.upper()).getValue();
158            String recordKey = encodeKey(logGroup, partition);
159            byte[] recordValue = encodeLatency(latency);
160            Record record = new Record(recordKey, recordValue, recordWatermark);
161            if (verbose) {
162                log.info("out: " + record);
163            }
164            context.produceRecord(OUTPUT_STREAM, record);
165            context.setSourceLowWatermark(recordWatermark);
166        }
167    }
168
169    protected byte[] encodeLatency(Latency latency) {
170        return latency.asJson().getBytes(StandardCharsets.UTF_8);
171    }
172
173    public static String encodeKey(LogPartitionGroup logGroup, int partition) {
174        return String.format("%s:%s:%s", logGroup.group.getId(), logGroup.name.getId(), partition);
175    }
176
177    public static LogPartitionGroup decodeKey(String key) {
178        String[] parts = key.split(":");
179        return new LogPartitionGroup(Name.ofId(parts[0]), Name.ofId(parts[1]), Integer.parseInt(parts[2]));
180    }
181
182    @Override
183    public void destroy() {
184        log.info("Good bye");
185    }
186
187    @Override
188    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
189        log.error("Receiving a record is not expected: " + record);
190    }
191
192}