001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.UnsupportedEncodingException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.lib.stream.codec.Codec; 030import org.nuxeo.lib.stream.computation.AbstractComputation; 031import org.nuxeo.lib.stream.computation.ComputationContext; 032import org.nuxeo.lib.stream.computation.Record; 033import org.nuxeo.lib.stream.computation.Watermark; 034import org.nuxeo.lib.stream.log.Latency; 035import org.nuxeo.lib.stream.log.LogManager; 036import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 037 038/** 039 * A computation that sends periodically latencies information into a Log. 040 * 041 * @since 10.1 042 */ 043public class LatencyTrackerComputation extends AbstractComputation { 044 private static final Log log = LogFactory.getLog(LatencyTrackerComputation.class); 045 046 protected static final String OUTPUT_STREAM = "o1"; 047 048 protected final LogManager manager; 049 050 protected final List<String> logNames; 051 052 protected final int intervalMs; 053 054 protected final int count; 055 056 protected final boolean verbose; 057 058 protected final Codec<Record> codec; 059 060 protected int remaining; 061 062 protected List<LogPartitionGroup> logGroups; 063 064 public LatencyTrackerComputation(LogManager manager, List<String> logNames, String computationName, 065 int intervalSecond, int count, boolean verbose, Codec<Record> codec) { 066 super(computationName, 1, 1); 067 this.manager = manager; 068 this.logNames = logNames; 069 this.intervalMs = 1000 * intervalSecond; 070 this.count = count; 071 this.remaining = count; 072 this.verbose = verbose; 073 this.codec = codec; 074 } 075 076 @Override 077 public void init(ComputationContext context) { 078 info(String.format("Tracking %s, count: %d, interval: %dms", Arrays.toString(logNames.toArray()), count, 079 intervalMs)); 080 logGroups = new ArrayList<>(); 081 logNames.forEach(name -> { 082 for (String group : manager.listConsumerGroups(name)) { 083 logGroups.add(new LogPartitionGroup(group, name, 0)); 084 } 085 }); 086 context.setTimer("tracker", System.currentTimeMillis() + intervalMs); 087 } 088 089 @Override 090 public void processTimer(ComputationContext context, String key, long timestamp) { 091 if (remaining == 0) { 092 debug("Exiting after " + count + " captures"); 093 context.askForTermination(); 094 return; 095 } 096 debug(String.format("Tracking latency %d/%d", count - remaining, count)); 097 for (LogPartitionGroup logGroup : logGroups) { 098 List<Latency> latencies = getLatenciesForPartition(logGroup, codec); 099 if (latencies.isEmpty()) { 100 continue; 101 } 102 for (int partition = 0; partition < latencies.size(); partition++) { 103 Latency latency = latencies.get(partition); 104 if (latency.lower() <= 0) { 105 // lower is the watermark timestamp for the latest processed record, without this info we cannot do 106 // anything 107 continue; 108 } 109 // upper is the time when the latency has been measured it is used as the watermark 110 long recordWatermark = Watermark.ofTimestamp(latency.upper()).getValue(); 111 String recordKey = encodeKey(logGroup, partition); 112 byte[] recordValue = encodeLatency(latency); 113 Record record = new Record(recordKey, recordValue, recordWatermark); 114 if (verbose) { 115 debug("out: " + record); 116 } 117 context.produceRecord(OUTPUT_STREAM, record); 118 context.setSourceLowWatermark(recordWatermark); 119 } 120 } 121 context.askForCheckpoint(); 122 context.setTimer("tracker", System.currentTimeMillis() + intervalMs); 123 remaining--; 124 } 125 126 protected byte[] encodeLatency(Latency latency) { 127 try { 128 return latency.asJson().getBytes("UTF-8"); 129 } catch (UnsupportedEncodingException e) { 130 throw new IllegalStateException("Failed to byte encoding " + latency, e); 131 } 132 } 133 134 @SuppressWarnings("squid:S1193") 135 protected List<Latency> getLatenciesForPartition(LogPartitionGroup logGroup, Codec<Record> codec) { 136 try { 137 return manager.getLatencyPerPartition(logGroup.name, logGroup.group, codec, 138 (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey)); 139 } catch (Exception e) { 140 if (e.getCause() instanceof ClassNotFoundException || e instanceof IllegalStateException) { 141 error("log does not contains Record, remove partition: " + logGroup); 142 return Collections.emptyList(); 143 } 144 throw e; 145 } 146 } 147 148 public static String encodeKey(LogPartitionGroup logGroup, int partition) { 149 return String.format("%s:%s:%s", logGroup.group, logGroup.name, partition); 150 } 151 152 public static LogPartitionGroup decodeKey(String key) { 153 String[] parts = key.split(":"); 154 return new LogPartitionGroup(parts[0], parts[1], Integer.parseInt(parts[2])); 155 } 156 157 @Override 158 public void destroy() { 159 info("Good bye"); 160 } 161 162 @Override 163 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 164 error("Receiving a record is not expected!: " + record); 165 } 166 167 protected void debug(String msg) { 168 if (verbose) { 169 log.info(msg); 170 } 171 } 172 173 protected void info(String msg) { 174 log.info(msg); 175 } 176 177 protected void error(String msg) { 178 log.error(msg); 179 } 180}