001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.bulk.introspection;
020
021import java.time.Instant;
022import java.util.HashMap;
023import java.util.Map;
024
025import org.nuxeo.lib.stream.log.Name;
026
027import com.fasterxml.jackson.core.JsonProcessingException;
028import com.fasterxml.jackson.databind.JsonNode;
029import com.fasterxml.jackson.databind.ObjectMapper;
030
031/**
032 * @since 11.5
033 */
034public class StreamIntrospectionConverter {
035    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
036
037    protected final String json;
038
039    protected final JsonNode root;
040
041    public StreamIntrospectionConverter(String json) {
042        this.json = json;
043        try {
044            this.root = OBJECT_MAPPER.readTree(json);
045        } catch (JsonProcessingException e) {
046            throw new IllegalArgumentException("Invalid JSON: " + json, e);
047        }
048    }
049
050    public String getPuml() {
051        StringBuilder ret = new StringBuilder();
052        ret.append("@startuml\n");
053        Map<String, String> streamMetrics = parseMetrics();
054        ret.append(getPumlHeader("Stream Introspection at " + streamMetrics.get("date")));
055        JsonNode node = root.get("streams");
056        if (node.isArray()) {
057            for (JsonNode item : node) {
058                dumpStream(ret, item, streamMetrics);
059            }
060        }
061
062        node = root.get("processors");
063        if (node.isArray()) {
064            for (JsonNode item : node) {
065                String host = item.at("/metadata/ip").asText();
066                // ret.append("rectangle node." + host + " {\n");
067                JsonNode computations = item.get("computations");
068                if (computations.isArray()) {
069                    for (JsonNode computation : computations) {
070                        dumpComputation(host, ret, computation, streamMetrics);
071                    }
072                }
073                // ret.append("}\n");
074                JsonNode topologies = item.get("topology");
075                if (topologies.isArray()) {
076                    for (JsonNode topo : topologies) {
077                        String comment = "";
078                        String source = topo.get(0).asText();
079                        String target = topo.get(1).asText();
080                        if (target.startsWith("computation:")) {
081                            String stream = source.replace("stream:", "");
082                            String computation = target.replace("computation:", "");
083                            String lag = streamMetrics.get(stream + ":" + computation + ":lag");
084                            String latency = streamMetrics.get(stream + ":" + computation + ":latency");
085                            String pos = streamMetrics.get(stream + ":" + computation + ":pos");
086                            String end = getStreamEnd(streamMetrics, stream);
087                            // provide info only when there is a lag
088                            if (lag != null && !"0".equals(lag)) {
089                                comment = String.format(": %s/%s lag: %s, latency: %ss", pos, end, lag, latency);
090                            }
091                        }
092                        ret.append(String.format("%s==>%s%s%n", getPumlIdentifierForHost(host, source),
093                                getPumlIdentifierForHost(host, target), comment));
094                    }
095                }
096
097            }
098        }
099
100        ret.append("@enduml\n");
101        return ret.toString();
102    }
103
104    protected Map<String, String> parseMetrics() {
105        Map<String, String> streamMetrics = new HashMap<>();
106        JsonNode node = root.get("metrics");
107        long timestamp = 0;
108        if (node.isArray()) {
109            for (JsonNode host : node) {
110                String hostIp = host.get("ip").asText();
111                long metricTimestamp = host.get("timestamp").asLong();
112                if (metricTimestamp > timestamp) {
113                    timestamp = metricTimestamp;
114                }
115                JsonNode hostMetrics = host.get("metrics");
116                if (hostMetrics.isArray()) {
117                    for (JsonNode metric : hostMetrics) {
118
119                        if (metric.has("stream")) {
120                            String key = metric.get("k").asText();
121                            String streamName = Name.urnOfId(metric.get("stream").asText());
122                            String computationName = Name.urnOfId(metric.get("group").asText());
123                            if ("nuxeo.streams.global.stream.group.end".equals(key)) {
124                                streamMetrics.put(streamName + ":end", metric.get("v").asText());
125                            } else if ("nuxeo.streams.global.stream.group.lag".equals(key)) {
126                                streamMetrics.put(streamName + ":" + computationName + ":lag",
127                                        metric.get("v").asText());
128                            } else if ("nuxeo.streams.global.stream.group.latency".equals(key)) {
129                                streamMetrics.put(streamName + ":" + computationName + ":latency",
130                                        getNiceDouble(metric.get("v").asDouble() / 1000.0));
131                            } else if ("nuxeo.streams.global.stream.group.pos".equals(key)) {
132                                streamMetrics.put(streamName + ":" + computationName + ":pos",
133                                        metric.get("v").asText());
134                            }
135                        } else if (metric.get("k").asText().endsWith("processRecord")) {
136                            int count = metric.get("count").asInt();
137                            if (count == 0) {
138                                continue;
139                            }
140                            String computationName = Name.urnOfId(metric.get("computation").asText());
141                            streamMetrics.put(computationName + ":" + hostIp + ":count", metric.get("count").asText());
142                            streamMetrics.put(computationName + ":" + hostIp + ":sum",
143                                    getNiceDouble3(metric.get("sum").asDouble() / 1000000000));
144                            streamMetrics.put(computationName + ":" + hostIp + ":p50",
145                                    getNiceDouble3(metric.get("p50").asDouble()));
146                            streamMetrics.put(computationName + ":" + hostIp + ":mean",
147                                    getNiceDouble3(metric.get("mean").asDouble()));
148                            streamMetrics.put(computationName + ":" + hostIp + ":p99",
149                                    getNiceDouble3(metric.get("p99").asDouble()));
150                            streamMetrics.put(computationName + ":" + hostIp + ":rate1m",
151                                    getNiceDouble(metric.get("rate1m").asDouble()));
152                            streamMetrics.put(computationName + ":" + hostIp + ":rate5m",
153                                    getNiceDouble(metric.get("rate5m").asDouble()));
154                        } else if (metric.get("k").asText().endsWith("processTimer")) {
155                            int count = metric.get("count").asInt();
156                            if (count == 0) {
157                                continue;
158                            }
159                            String computationName = Name.urnOfId(metric.get("computation").asText());
160                            streamMetrics.put(computationName + ":" + hostIp + ":timer:count",
161                                    metric.get("count").asText());
162                            streamMetrics.put(computationName + ":" + hostIp + ":timer:sum",
163                                    getNiceDouble3(metric.get("sum").asDouble() / 1000000000));
164                            streamMetrics.put(computationName + ":" + hostIp + ":timer:p50",
165                                    getNiceDouble3(metric.get("p50").asDouble()));
166                            streamMetrics.put(computationName + ":" + hostIp + ":timer:mean",
167                                    getNiceDouble3(metric.get("mean").asDouble()));
168                            streamMetrics.put(computationName + ":" + hostIp + ":timer:p99",
169                                    getNiceDouble3(metric.get("p99").asDouble()));
170                            streamMetrics.put(computationName + ":" + hostIp + ":timer:rate1m",
171                                    getNiceDouble(metric.get("rate1m").asDouble()));
172                            streamMetrics.put(computationName + ":" + hostIp + ":timer:rate5m",
173                                    getNiceDouble(metric.get("rate5m").asDouble()));
174                        } else if (metric.get("k").asText().endsWith("computation.failure")) {
175                            int failure = metric.get("v").asInt();
176                            if (failure > 0) {
177                                String computationName = Name.urnOfId(metric.get("computation").asText()) + ":"
178                                        + hostIp;
179                                streamMetrics.put(computationName + ":failure", metric.get("v").asText());
180                            }
181                        } else if (metric.get("k").asText().endsWith("stream.failure")) {
182                            int value = metric.get("v").asInt();
183                            if (value > 0) {
184                                streamMetrics.put(hostIp + ":failure", metric.get("v").asText());
185                            }
186                        } else if (metric.get("k").asText().endsWith("computation.skippedRecord")) {
187                            int value = metric.get("v").asInt();
188                            if (value > 0) {
189                                String computationName = Name.urnOfId(metric.get("computation").asText()) + ":"
190                                        + hostIp;
191                                streamMetrics.put(computationName + ":skipped", metric.get("v").asText());
192                            }
193                        }
194                    }
195                }
196            }
197        }
198        streamMetrics.put("timestamp", String.valueOf(timestamp));
199        streamMetrics.put("date", Instant.ofEpochSecond(timestamp).toString());
200        return streamMetrics;
201    }
202
203    protected String getNiceDouble(Double number) {
204        return String.format("%.2f", number);
205    }
206
207    protected String getNiceDouble3(Double number) {
208        return String.format("%.3f", number);
209    }
210
211    protected String getPumlHeader(String title) {
212        return "title " + title + "\n\n" //
213                + "skinparam defaultFontName Courier\n" + "skinparam handwritten false\n" //
214                + "skinparam queueBackgroundColor LightYellow\n" //
215                + "skinparam nodeBackgroundColor Azure\n" //
216                + "skinparam componentBackgroundColor Azure\n" //
217                + "skinparam nodebackgroundColor<<failure>> Yellow\n" //
218                + "skinparam componentbackgroundColor<<failure>> Yellow\n" //
219                + "skinparam component {\n"
220                + "  BorderColor black\n" + "  ArrowColor #CC6655\n" + "}\n";
221    }
222
223    protected String getPumlIdentifierForHost(String host, String id) {
224        if (id.startsWith("computation:")) {
225            return getPumlIdentifier(id + ":" + host);
226        }
227        return getPumlIdentifier(id);
228    }
229
230    protected void dumpStream(StringBuilder ret, JsonNode item, Map<String, String> metrics) {
231        String name = item.get("name").asText();
232        String partitions = item.get("partitions").asText();
233        String codec = item.get("codec").asText();
234        ret.append(String.format("queue %s [%s%n----%npartitions: %s%ncodec: %s%n-----%nrecords: %s]%n",
235                getPumlIdentifier("stream:" + name), name, partitions, codec, getStreamEnd(metrics, name)));
236    }
237
238    protected String getStreamEnd(Map<String, String> metrics, String name) {
239        String ret = metrics.get(name + ":end");
240        return ret == null ? "0" : ret;
241    }
242
243    protected void dumpComputation(String host, StringBuilder ret, JsonNode item, Map<String, String> metrics) {
244        String name = item.get("name").asText();
245        String threads = item.get("threads").asText();
246        String continueOnFailure = item.get("continueOnFailure").asText();
247        String failure = "";
248        if (metrics.containsKey(name + ":" + host + ":failure")) {
249            failure = " <<failure>>";
250        }
251        ret.append(String.format("component %s %s[%s%n----%nthreads: %s%ncontinue on failure: %s%n%s%s]%n",
252                getPumlIdentifier("computation:" + name + ":" + host), failure, name + " on " + host, threads, continueOnFailure,
253                getBatchInfo(item), getComputationMetrics(host, name, item, metrics)));
254    }
255
256    protected String getComputationMetrics(String host, String name, JsonNode item, Map<String, String> metrics) {
257        String ret = "";
258        String baseKey = name + ":" + host;
259        if (!metrics.containsKey(baseKey + ":count")) {
260            return ret;
261        }
262        ret += "\n----\n";
263        if (metrics.containsKey(baseKey + ":failure")) {
264            ret += "FAILURE: " + metrics.get(baseKey + ":failure") + "\n";
265        }
266        ret += "record count: " + metrics.get(baseKey + ":count") + ", total: " + metrics.get(baseKey + ":sum") + "s\n";
267        if (metrics.containsKey(baseKey + ":skipped")) {
268            ret += "record skipped: " + metrics.get(baseKey + ":skipped") + "\n";
269        }
270        ret += "mean: " + metrics.get(baseKey + ":mean") + "s, p50: " + metrics.get(baseKey + ":p50") + "s, p99: "
271                + metrics.get(baseKey + ":p99") + "s\n";
272        ret += "rate 1min: " + metrics.get(baseKey + ":rate1m") + "op/s, 5min: " + metrics.get(baseKey + ":rate5m")
273                + "op/s";
274        if (!metrics.containsKey(baseKey + ":timer:count")) {
275            return ret;
276        }
277        ret += "\n----\n";
278        baseKey = baseKey + ":timer";
279        ret += "timer count: " + metrics.get(baseKey + ":count") + ", total: " + metrics.get(baseKey + ":sum") + "s\n";
280        ret += "mean: " + metrics.get(baseKey + ":mean") + "s, p50: " + metrics.get(baseKey + ":p50") + "s, p99: "
281                + metrics.get(baseKey + ":p99") + "s\n";
282        ret += "rate 5min: " + metrics.get(baseKey + ":rate5m") + "op/s";
283        return ret;
284    }
285
286    protected String getBatchInfo(JsonNode item) {
287        String ret = "";
288        int batchCapacity = item.get("batchCapacity").asInt();
289        if (batchCapacity > 1) {
290            int batchThresholdMs = item.get("batchCapacity").asInt();
291            ret += "batch " + item.get("batchCapacity").asText() + " " + batchThresholdMs + "ms\n";
292        } else {
293            ret += "no batch\n";
294        }
295        int retry = item.get("maxRetries").asInt();
296        if (retry > 1) {
297            ret += "max retry: " + item.get("maxRetries").asText() + ", delay: " + item.get("retryDelayMs").asText()
298                    + "ms";
299        } else {
300            ret += "no retry";
301        }
302        return ret;
303    }
304
305    protected String getPumlIdentifier(String name) {
306        return name.replaceAll("[^a-zA-Z0-9]", ".");
307    }
308
309}