001/* 002 * (C) Copyright 2020 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.runtime.stream; 020 021import static org.nuxeo.lib.stream.computation.log.ComputationRunner.NUXEO_METRICS_REGISTRY_NAME; 022 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Set; 028import java.util.stream.Collectors; 029 030import org.apache.logging.log4j.Logger; 031import org.nuxeo.lib.stream.codec.AvroMessageCodec; 032import org.nuxeo.lib.stream.codec.Codec; 033import org.nuxeo.lib.stream.computation.AbstractComputation; 034import org.nuxeo.lib.stream.computation.ComputationContext; 035import org.nuxeo.lib.stream.computation.Record; 036import org.nuxeo.lib.stream.computation.Watermark; 037import org.nuxeo.lib.stream.log.Latency; 038import org.nuxeo.lib.stream.log.LogManager; 039import org.nuxeo.lib.stream.log.Name; 040import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 041import org.nuxeo.runtime.api.Framework; 042 043import io.dropwizard.metrics5.Gauge; 044import io.dropwizard.metrics5.MetricName; 045import io.dropwizard.metrics5.MetricRegistry; 046import io.dropwizard.metrics5.SharedMetricRegistries; 047 048/** 049 * A computation that exposes Nuxeo Stream metrics as Dropwizard metrics. 050 * 051 * @since 11.1 052 */ 053public class StreamMetricsComputation extends AbstractComputation { 054 private static final Logger log = org.apache.logging.log4j.LogManager.getLogger( 055 StreamMetricsComputation.class); 056 057 protected static final String NAME = "stream/metrics"; 058 059 protected MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME); 060 061 protected final long intervalMs; 062 063 protected final List<String> inputStreams; 064 065 protected final List<Name> streams = new ArrayList<>(); 066 067 protected final Set<Name> invalidStreams = new HashSet<>(); 068 069 protected final List<LogPartitionGroup> groups = new ArrayList<>(); 070 071 protected final List<LatencyMetric> metrics = new ArrayList<>(); 072 073 protected LogManager manager; 074 075 protected final Codec<Record> codec = new AvroMessageCodec<>(Record.class); 076 077 protected long refreshGroupCounter; 078 079 public StreamMetricsComputation(Duration interval, List<String> streams) { 080 super(NAME, 1, 0); 081 this.intervalMs = interval.toMillis(); 082 this.inputStreams = streams; 083 } 084 085 @Override 086 public void init(ComputationContext context) { 087 if (context.isSpareComputation()) { 088 log.info("Spare instance nothing to report"); 089 unregisterMetrics(); 090 } else { 091 log.warn("Instance elected to report stream metrics"); 092 context.setTimer("tracker", System.currentTimeMillis() + intervalMs); 093 } 094 } 095 096 @Override 097 public void destroy() { 098 unregisterMetrics(); 099 } 100 101 protected void registerMetrics() { 102 unregisterMetrics(); 103 getGroups().forEach(group -> metrics.add(new LatencyMetric(group, registry))); 104 } 105 106 protected void unregisterMetrics() { 107 metrics.forEach(LatencyMetric::destroy); 108 metrics.clear(); 109 } 110 111 @Override 112 public void processTimer(ComputationContext context, String key, long timestamp) { 113 refreshMetricsIfNeeded(); 114 log.debug("start update metrics: {}", metrics::size); 115 List<LatencyMetric> toRemove = metrics.stream() 116 .filter(metric -> metric.update(getManager(), codec)) 117 .collect(Collectors.toList()); 118 toRemove.forEach(LatencyMetric::destroy); 119 toRemove.forEach(metric -> invalidStreams.add(metric.getStream())); 120 metrics.removeAll(toRemove); 121 context.setTimer("tracker", System.currentTimeMillis() + intervalMs); 122 } 123 124 protected void refreshMetricsIfNeeded() { 125 if (streams.isEmpty() || groups.isEmpty() || metrics.isEmpty() || ++refreshGroupCounter % 5 == 0) { 126 streams.clear(); 127 groups.clear(); 128 registerMetrics(); 129 } 130 } 131 132 protected List<Name> getStreams() { 133 if (streams.isEmpty()) { 134 if (inputStreams == null || inputStreams.isEmpty()) { 135 streams.addAll(getManager().listAllNames()); 136 log.debug("Use all available streams: {}", streams); 137 } else { 138 inputStreams.forEach(stream -> streams.add(Name.ofUrn(stream))); 139 log.debug("Use input streams: {}", streams); 140 } 141 if (!invalidStreams.isEmpty()) { 142 streams.removeAll(invalidStreams); 143 log.debug("Filtered list of streams: {}", streams); 144 } 145 } 146 return streams; 147 } 148 149 protected List<LogPartitionGroup> getGroups() { 150 if (groups.isEmpty()) { 151 getStreams().forEach(name -> { 152 getManager().listConsumerGroups(name) 153 .forEach(group -> groups.add(new LogPartitionGroup(group, name, 0))); 154 }); 155 log.info("Update list of consumers: {}", groups); 156 } 157 return groups; 158 } 159 160 protected LogManager getManager() { 161 // it is ok to cache a service here, hot reload will restart the computation 162 if (manager == null) { 163 manager = Framework.getService(StreamService.class).getLogManager(); 164 } 165 return manager; 166 } 167 168 @Override 169 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 170 // this is not supposed to happen 171 } 172 173 public static class LatencyMetric { 174 public static final Object PREFIX = "nuxeo.streams.global.stream.group."; 175 176 protected final LogPartitionGroup consumer; 177 178 protected final MetricRegistry registry; 179 180 protected final MetricName endMetric; 181 182 protected final MetricName posMetric; 183 184 protected final MetricName lagMetric; 185 186 protected final MetricName latMetric; 187 188 protected Latency latency; 189 190 protected boolean registered; 191 192 public LatencyMetric(LogPartitionGroup consumer, MetricRegistry registry) { 193 this.consumer = consumer; 194 this.registry = registry; 195 endMetric = getMetricName("end"); 196 posMetric = getMetricName("pos"); 197 lagMetric = getMetricName("lag"); 198 latMetric = getMetricName("latency"); 199 } 200 201 protected MetricName getMetricName(String name) { 202 return MetricName.build(PREFIX + name) 203 .tagged("stream", consumer.name.getId()) 204 .tagged("group", consumer.group.getId()); 205 } 206 207 protected void registerMetrics() { 208 registry.register(endMetric, (Gauge<Long>) () -> latency.lag().upper()); 209 registry.register(posMetric, (Gauge<Long>) () -> latency.lag().lower()); 210 registry.register(lagMetric, (Gauge<Long>) () -> latency.lag().lag()); 211 registry.register(latMetric, (Gauge<Long>) () -> latency.latency()); 212 } 213 214 protected void unregisterMetrics() { 215 registry.remove(endMetric); 216 registry.remove(posMetric); 217 registry.remove(lagMetric); 218 registry.remove(latMetric); 219 } 220 221 public boolean update(LogManager manager, Codec<Record> codec) { 222 try { 223 latency = manager.getLatency(consumer.name, consumer.group, codec, 224 (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey)); 225 if (!registered) { 226 registerMetrics(); 227 registered = true; 228 } 229 } catch (Exception e) { 230 if (e.getCause() instanceof ClassNotFoundException || e.getCause() instanceof ClassCastException 231 || e instanceof IllegalStateException || e instanceof IllegalArgumentException) { 232 log.warn("Invalid stream, cannot get latency: " + consumer, e); 233 return true; 234 } 235 throw e; 236 } 237 return false; 238 } 239 240 public void destroy() { 241 unregisterMetrics(); 242 } 243 244 public Name getStream() { 245 return consumer.getLogPartition().name(); 246 } 247 248 } 249 250}