001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.UnsupportedEncodingException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.List; 025 026import org.nuxeo.lib.stream.computation.AbstractComputation; 027import org.nuxeo.lib.stream.computation.ComputationContext; 028import org.nuxeo.lib.stream.computation.Record; 029import org.nuxeo.lib.stream.computation.Watermark; 030import org.nuxeo.lib.stream.log.Latency; 031import org.nuxeo.lib.stream.log.LogManager; 032import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 033 034/** 035 * A computation that sends periodically latencies information. 036 * 037 * @since 10.1 038 */ 039public class LatencyTrackerComputation extends AbstractComputation { 040 041 protected static final String OUTPUT_STREAM = "o1"; 042 043 protected final LogManager manager; 044 045 protected final List<String> logNames; 046 047 protected final int intervalMs; 048 049 protected final int count; 050 051 protected final boolean verbose; 052 053 protected int remaining; 054 055 protected List<LogPartitionGroup> logGroups; 056 057 public LatencyTrackerComputation(LogManager manager, List<String> logNames, String computationName, 058 int intervalSecond, int count, boolean verbose) { 059 super(computationName, 1, 1); 060 this.manager = manager; 061 this.logNames = logNames; 062 this.intervalMs = 1000 * intervalSecond; 063 this.count = count; 064 this.remaining = count; 065 this.verbose = verbose; 066 } 067 068 @Override 069 public void init(ComputationContext context) { 070 info(String.format("Tracking %s, count: %d, interval: %dms", Arrays.toString(logNames.toArray()), count, 071 intervalMs)); 072 logGroups = new ArrayList<>(); 073 logNames.forEach(name -> { 074 for (String group : manager.listConsumerGroups(name)) { 075 logGroups.add(new LogPartitionGroup(group, name, 0)); 076 } 077 }); 078 context.setTimer("tracker", System.currentTimeMillis() + intervalMs); 079 } 080 081 @Override 082 public void processTimer(ComputationContext context, String key, long timestamp) { 083 if (remaining == 0) { 084 debug("Exiting after " + count + " captures"); 085 context.askForTermination(); 086 return; 087 } 088 debug(String.format("Tracking latency %d/%d", count - remaining, count)); 089 for (LogPartitionGroup logGroup : logGroups) { 090 List<Latency> latencies; 091 try { 092 latencies = manager.<Record> getLatencyPerPartition(logGroup.name, logGroup.group, 093 (rec -> Watermark.ofValue(rec.watermark).getTimestamp()), (rec -> rec.key)); 094 } catch (IllegalStateException e) { 095 error("log does not contains Record: " + logGroup); 096 continue; 097 } 098 int partition = 0; 099 for (Latency latency : latencies) { 100 String recordKey = encodeKey(logGroup, partition); 101 byte[] value; 102 try { 103 value = latency.asJson().getBytes("UTF-8"); 104 } catch (UnsupportedEncodingException e) { 105 throw new IllegalStateException("Faild to byte encoding " + latency, e); 106 } 107 Record record = new Record(recordKey, value, Watermark.ofTimestamp(latency.upper()).getValue(), null); 108 debug("out: " + record); 109 context.produceRecord(OUTPUT_STREAM, record); 110 context.setSourceLowWatermark(latency.upper()); 111 partition++; 112 } 113 } 114 context.askForCheckpoint(); 115 context.setTimer("tracker", System.currentTimeMillis() + intervalMs); 116 remaining--; 117 } 118 119 public static String encodeKey(LogPartitionGroup logGroup, int partition) { 120 return String.format("%s:%s:%s", logGroup.group, logGroup.name, partition); 121 } 122 123 public static LogPartitionGroup decodeKey(String key) { 124 String[] parts = key.split(":"); 125 return new LogPartitionGroup(parts[0], parts[1], Integer.parseInt(parts[2])); 126 } 127 128 @Override 129 public void destroy() { 130 info("Good bye"); 131 } 132 133 @Override 134 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 135 error("Receiving a record is not expected!: " + record); 136 } 137 138 protected void debug(String msg) { 139 if (verbose) { 140 System.out.println(msg); 141 } 142 } 143 144 protected void info(String msg) { 145 System.out.println(msg); 146 } 147 148 protected void error(String msg) { 149 System.err.println(msg); 150 } 151}