001/* 002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.bulk.introspection; 020 021import java.time.Instant; 022import java.util.HashMap; 023import java.util.Map; 024 025import org.nuxeo.lib.stream.log.Name; 026 027import com.fasterxml.jackson.core.JsonProcessingException; 028import com.fasterxml.jackson.databind.JsonNode; 029import com.fasterxml.jackson.databind.ObjectMapper; 030 031/** 032 * @since 11.5 033 */ 034public class StreamIntrospectionConverter { 035 protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); 036 037 protected final String json; 038 039 protected final JsonNode root; 040 041 public StreamIntrospectionConverter(String json) { 042 this.json = json; 043 try { 044 this.root = OBJECT_MAPPER.readTree(json); 045 } catch (JsonProcessingException e) { 046 throw new IllegalArgumentException("Invalid JSON: " + json, e); 047 } 048 } 049 050 public String getPuml() { 051 StringBuilder ret = new StringBuilder(); 052 ret.append("@startuml\n"); 053 Map<String, String> streamMetrics = parseMetrics(); 054 ret.append(getPumlHeader("Stream Introspection at " + streamMetrics.get("date"))); 055 JsonNode node = root.get("streams"); 056 if (node.isArray()) { 057 for (JsonNode item : node) { 058 dumpStream(ret, item, streamMetrics); 059 } 060 } 061 062 node = root.get("processors"); 063 if (node.isArray()) { 064 for (JsonNode item : node) { 065 String host = item.at("/metadata/ip").asText(); 066 // ret.append("rectangle node." + host + " {\n"); 067 JsonNode computations = item.get("computations"); 068 if (computations.isArray()) { 069 for (JsonNode computation : computations) { 070 dumpComputation(host, ret, computation, streamMetrics); 071 } 072 } 073 // ret.append("}\n"); 074 JsonNode topologies = item.get("topology"); 075 if (topologies.isArray()) { 076 for (JsonNode topo : topologies) { 077 String comment = ""; 078 String source = topo.get(0).asText(); 079 String target = topo.get(1).asText(); 080 if (target.startsWith("computation:")) { 081 String stream = source.replace("stream:", ""); 082 String computation = target.replace("computation:", ""); 083 String lag = streamMetrics.get(stream + ":" + computation + ":lag"); 084 String latency = streamMetrics.get(stream + ":" + computation + ":latency"); 085 String pos = streamMetrics.get(stream + ":" + computation + ":pos"); 086 String end = getStreamEnd(streamMetrics, stream); 087 // provide info only when there is a lag 088 if (lag != null && !"0".equals(lag)) { 089 comment = String.format(": %s/%s lag: %s, latency: %ss", pos, end, lag, latency); 090 } 091 } 092 ret.append(String.format("%s==>%s%s%n", getPumlIdentifierForHost(host, source), 093 getPumlIdentifierForHost(host, target), comment)); 094 } 095 } 096 097 } 098 } 099 100 ret.append("@enduml\n"); 101 return ret.toString(); 102 } 103 104 protected Map<String, String> parseMetrics() { 105 Map<String, String> streamMetrics = new HashMap<>(); 106 JsonNode node = root.get("metrics"); 107 long timestamp = 0; 108 if (node.isArray()) { 109 for (JsonNode host : node) { 110 String hostIp = host.get("ip").asText(); 111 long metricTimestamp = host.get("timestamp").asLong(); 112 if (metricTimestamp > timestamp) { 113 timestamp = metricTimestamp; 114 } 115 JsonNode hostMetrics = host.get("metrics"); 116 if (hostMetrics.isArray()) { 117 for (JsonNode metric : hostMetrics) { 118 119 if (metric.has("stream")) { 120 String key = metric.get("k").asText(); 121 String streamName = Name.urnOfId(metric.get("stream").asText()); 122 String computationName = Name.urnOfId(metric.get("group").asText()); 123 if ("nuxeo.streams.global.stream.group.end".equals(key)) { 124 streamMetrics.put(streamName + ":end", metric.get("v").asText()); 125 } else if ("nuxeo.streams.global.stream.group.lag".equals(key)) { 126 streamMetrics.put(streamName + ":" + computationName + ":lag", 127 metric.get("v").asText()); 128 } else if ("nuxeo.streams.global.stream.group.latency".equals(key)) { 129 streamMetrics.put(streamName + ":" + computationName + ":latency", 130 getNiceDouble(metric.get("v").asDouble() / 1000.0)); 131 } else if ("nuxeo.streams.global.stream.group.pos".equals(key)) { 132 streamMetrics.put(streamName + ":" + computationName + ":pos", 133 metric.get("v").asText()); 134 } 135 } else if (metric.get("k").asText().endsWith("processRecord")) { 136 int count = metric.get("count").asInt(); 137 if (count == 0) { 138 continue; 139 } 140 String computationName = Name.urnOfId(metric.get("computation").asText()); 141 streamMetrics.put(computationName + ":" + hostIp + ":count", metric.get("count").asText()); 142 streamMetrics.put(computationName + ":" + hostIp + ":sum", 143 getNiceDouble3(metric.get("sum").asDouble() / 1000000000)); 144 streamMetrics.put(computationName + ":" + hostIp + ":p50", 145 getNiceDouble3(metric.get("p50").asDouble())); 146 streamMetrics.put(computationName + ":" + hostIp + ":mean", 147 getNiceDouble3(metric.get("mean").asDouble())); 148 streamMetrics.put(computationName + ":" + hostIp + ":p99", 149 getNiceDouble3(metric.get("p99").asDouble())); 150 streamMetrics.put(computationName + ":" + hostIp + ":rate1m", 151 getNiceDouble(metric.get("rate1m").asDouble())); 152 streamMetrics.put(computationName + ":" + hostIp + ":rate5m", 153 getNiceDouble(metric.get("rate5m").asDouble())); 154 } else if (metric.get("k").asText().endsWith("processTimer")) { 155 int count = metric.get("count").asInt(); 156 if (count == 0) { 157 continue; 158 } 159 String computationName = Name.urnOfId(metric.get("computation").asText()); 160 streamMetrics.put(computationName + ":" + hostIp + ":timer:count", 161 metric.get("count").asText()); 162 streamMetrics.put(computationName + ":" + hostIp + ":timer:sum", 163 getNiceDouble3(metric.get("sum").asDouble() / 1000000000)); 164 streamMetrics.put(computationName + ":" + hostIp + ":timer:p50", 165 getNiceDouble3(metric.get("p50").asDouble())); 166 streamMetrics.put(computationName + ":" + hostIp + ":timer:mean", 167 getNiceDouble3(metric.get("mean").asDouble())); 168 streamMetrics.put(computationName + ":" + hostIp + ":timer:p99", 169 getNiceDouble3(metric.get("p99").asDouble())); 170 streamMetrics.put(computationName + ":" + hostIp + ":timer:rate1m", 171 getNiceDouble(metric.get("rate1m").asDouble())); 172 streamMetrics.put(computationName + ":" + hostIp + ":timer:rate5m", 173 getNiceDouble(metric.get("rate5m").asDouble())); 174 } else if (metric.get("k").asText().endsWith("computation.failure")) { 175 int failure = metric.get("v").asInt(); 176 if (failure > 0) { 177 String computationName = Name.urnOfId(metric.get("computation").asText()) + ":" 178 + hostIp; 179 streamMetrics.put(computationName + ":failure", metric.get("v").asText()); 180 } 181 } else if (metric.get("k").asText().endsWith("stream.failure")) { 182 int value = metric.get("v").asInt(); 183 if (value > 0) { 184 streamMetrics.put(hostIp + ":failure", metric.get("v").asText()); 185 } 186 } else if (metric.get("k").asText().endsWith("computation.skippedRecord")) { 187 int value = metric.get("v").asInt(); 188 if (value > 0) { 189 String computationName = Name.urnOfId(metric.get("computation").asText()) + ":" 190 + hostIp; 191 streamMetrics.put(computationName + ":skipped", metric.get("v").asText()); 192 } 193 } 194 } 195 } 196 } 197 } 198 streamMetrics.put("timestamp", String.valueOf(timestamp)); 199 streamMetrics.put("date", Instant.ofEpochSecond(timestamp).toString()); 200 return streamMetrics; 201 } 202 203 protected String getNiceDouble(Double number) { 204 return String.format("%.2f", number); 205 } 206 207 protected String getNiceDouble3(Double number) { 208 return String.format("%.3f", number); 209 } 210 211 protected String getPumlHeader(String title) { 212 return "title " + title + "\n\n" // 213 + "skinparam defaultFontName Courier\n" + "skinparam handwritten false\n" // 214 + "skinparam queueBackgroundColor LightYellow\n" // 215 + "skinparam nodeBackgroundColor Azure\n" // 216 + "skinparam componentBackgroundColor Azure\n" // 217 + "skinparam nodebackgroundColor<<failure>> Yellow\n" // 218 + "skinparam componentbackgroundColor<<failure>> Yellow\n" // 219 + "skinparam component {\n" 220 + " BorderColor black\n" + " ArrowColor #CC6655\n" + "}\n"; 221 } 222 223 protected String getPumlIdentifierForHost(String host, String id) { 224 if (id.startsWith("computation:")) { 225 return getPumlIdentifier(id + ":" + host); 226 } 227 return getPumlIdentifier(id); 228 } 229 230 protected void dumpStream(StringBuilder ret, JsonNode item, Map<String, String> metrics) { 231 String name = item.get("name").asText(); 232 String partitions = item.get("partitions").asText(); 233 String codec = item.get("codec").asText(); 234 ret.append(String.format("queue %s [%s%n----%npartitions: %s%ncodec: %s%n-----%nrecords: %s]%n", 235 getPumlIdentifier("stream:" + name), name, partitions, codec, getStreamEnd(metrics, name))); 236 } 237 238 protected String getStreamEnd(Map<String, String> metrics, String name) { 239 String ret = metrics.get(name + ":end"); 240 return ret == null ? "0" : ret; 241 } 242 243 protected void dumpComputation(String host, StringBuilder ret, JsonNode item, Map<String, String> metrics) { 244 String name = item.get("name").asText(); 245 String threads = item.get("threads").asText(); 246 String continueOnFailure = item.get("continueOnFailure").asText(); 247 String failure = ""; 248 if (metrics.containsKey(name + ":" + host + ":failure")) { 249 failure = " <<failure>>"; 250 } 251 ret.append(String.format("component %s %s[%s%n----%nthreads: %s%ncontinue on failure: %s%n%s%s]%n", 252 getPumlIdentifier("computation:" + name + ":" + host), failure, name + " on " + host, threads, continueOnFailure, 253 getBatchInfo(item), getComputationMetrics(host, name, item, metrics))); 254 } 255 256 protected String getComputationMetrics(String host, String name, JsonNode item, Map<String, String> metrics) { 257 String ret = ""; 258 String baseKey = name + ":" + host; 259 if (!metrics.containsKey(baseKey + ":count")) { 260 return ret; 261 } 262 ret += "\n----\n"; 263 if (metrics.containsKey(baseKey + ":failure")) { 264 ret += "FAILURE: " + metrics.get(baseKey + ":failure") + "\n"; 265 } 266 ret += "record count: " + metrics.get(baseKey + ":count") + ", total: " + metrics.get(baseKey + ":sum") + "s\n"; 267 if (metrics.containsKey(baseKey + ":skipped")) { 268 ret += "record skipped: " + metrics.get(baseKey + ":skipped") + "\n"; 269 } 270 ret += "mean: " + metrics.get(baseKey + ":mean") + "s, p50: " + metrics.get(baseKey + ":p50") + "s, p99: " 271 + metrics.get(baseKey + ":p99") + "s\n"; 272 ret += "rate 1min: " + metrics.get(baseKey + ":rate1m") + "op/s, 5min: " + metrics.get(baseKey + ":rate5m") 273 + "op/s"; 274 if (!metrics.containsKey(baseKey + ":timer:count")) { 275 return ret; 276 } 277 ret += "\n----\n"; 278 baseKey = baseKey + ":timer"; 279 ret += "timer count: " + metrics.get(baseKey + ":count") + ", total: " + metrics.get(baseKey + ":sum") + "s\n"; 280 ret += "mean: " + metrics.get(baseKey + ":mean") + "s, p50: " + metrics.get(baseKey + ":p50") + "s, p99: " 281 + metrics.get(baseKey + ":p99") + "s\n"; 282 ret += "rate 5min: " + metrics.get(baseKey + ":rate5m") + "op/s"; 283 return ret; 284 } 285 286 protected String getBatchInfo(JsonNode item) { 287 String ret = ""; 288 int batchCapacity = item.get("batchCapacity").asInt(); 289 if (batchCapacity > 1) { 290 int batchThresholdMs = item.get("batchCapacity").asInt(); 291 ret += "batch " + item.get("batchCapacity").asText() + " " + batchThresholdMs + "ms\n"; 292 } else { 293 ret += "no batch\n"; 294 } 295 int retry = item.get("maxRetries").asInt(); 296 if (retry > 1) { 297 ret += "max retry: " + item.get("maxRetries").asText() + ", delay: " + item.get("retryDelayMs").asText() 298 + "ms"; 299 } else { 300 ret += "no retry"; 301 } 302 return ret; 303 } 304 305 protected String getPumlIdentifier(String name) { 306 return name.replaceAll("[^a-zA-Z0-9]", "."); 307 } 308 309}