001/*
002 * (C) Copyright 2020 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.stream;
020
021import static org.nuxeo.lib.stream.computation.log.ComputationRunner.NUXEO_METRICS_REGISTRY_NAME;
022
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Set;
028import java.util.stream.Collectors;
029
030import org.apache.logging.log4j.Logger;
031import org.nuxeo.lib.stream.codec.AvroMessageCodec;
032import org.nuxeo.lib.stream.codec.Codec;
033import org.nuxeo.lib.stream.computation.AbstractComputation;
034import org.nuxeo.lib.stream.computation.ComputationContext;
035import org.nuxeo.lib.stream.computation.Record;
036import org.nuxeo.lib.stream.computation.Watermark;
037import org.nuxeo.lib.stream.log.Latency;
038import org.nuxeo.lib.stream.log.LogManager;
039import org.nuxeo.lib.stream.log.Name;
040import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
041import org.nuxeo.runtime.api.Framework;
042
043import io.dropwizard.metrics5.Gauge;
044import io.dropwizard.metrics5.MetricName;
045import io.dropwizard.metrics5.MetricRegistry;
046import io.dropwizard.metrics5.SharedMetricRegistries;
047
048/**
049 * A computation that exposes Nuxeo Stream metrics as Dropwizard metrics.
050 *
051 * @since 11.1
052 */
053public class StreamMetricsComputation extends AbstractComputation {
054    private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(
055            StreamMetricsComputation.class);
056
057    protected static final String NAME = "stream/metrics";
058
059    protected MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
060
061    protected final long intervalMs;
062
063    protected final List<String> inputStreams;
064
065    protected final List<Name> streams = new ArrayList<>();
066
067    protected final Set<Name> invalidStreams = new HashSet<>();
068
069    protected final List<LogPartitionGroup> groups = new ArrayList<>();
070
071    protected final List<LatencyMetric> metrics = new ArrayList<>();
072
073    protected LogManager manager;
074
075    protected final Codec<Record> codec = new AvroMessageCodec<>(Record.class);
076
077    protected long refreshGroupCounter;
078
079    public StreamMetricsComputation(Duration interval, List<String> streams) {
080        super(NAME, 1, 0);
081        this.intervalMs = interval.toMillis();
082        this.inputStreams = streams;
083    }
084
085    @Override
086    public void init(ComputationContext context) {
087        if (context.isSpareComputation()) {
088            log.info("Spare instance nothing to report");
089            unregisterMetrics();
090        } else {
091            log.warn("Instance elected to report stream metrics");
092            context.setTimer("tracker", System.currentTimeMillis() + intervalMs);
093        }
094    }
095
096    @Override
097    public void destroy() {
098        unregisterMetrics();
099    }
100
101    protected void registerMetrics() {
102        unregisterMetrics();
103        getGroups().forEach(group -> metrics.add(new LatencyMetric(group, registry)));
104    }
105
106    protected void unregisterMetrics() {
107        metrics.forEach(LatencyMetric::destroy);
108        metrics.clear();
109    }
110
111    @Override
112    public void processTimer(ComputationContext context, String key, long timestamp) {
113        refreshMetricsIfNeeded();
114        log.debug("start update metrics: {}", metrics::size);
115        List<LatencyMetric> toRemove = metrics.stream()
116                                              .filter(metric -> metric.update(getManager(), codec))
117                                              .collect(Collectors.toList());
118        toRemove.forEach(LatencyMetric::destroy);
119        toRemove.forEach(metric -> invalidStreams.add(metric.getStream()));
120        metrics.removeAll(toRemove);
121        context.setTimer("tracker", System.currentTimeMillis() + intervalMs);
122    }
123
124    protected void refreshMetricsIfNeeded() {
125        if (streams.isEmpty() || groups.isEmpty() || metrics.isEmpty() || ++refreshGroupCounter % 5 == 0) {
126            streams.clear();
127            groups.clear();
128            registerMetrics();
129        }
130    }
131
132    protected List<Name> getStreams() {
133        if (streams.isEmpty()) {
134            if (inputStreams == null || inputStreams.isEmpty()) {
135                streams.addAll(getManager().listAllNames());
136                log.debug("Use all available streams: {}", streams);
137            } else {
138                inputStreams.forEach(stream -> streams.add(Name.ofUrn(stream)));
139                log.debug("Use input streams: {}", streams);
140            }
141            if (!invalidStreams.isEmpty()) {
142                streams.removeAll(invalidStreams);
143                log.debug("Filtered list of streams: {}", streams);
144            }
145        }
146        return streams;
147    }
148
149    protected List<LogPartitionGroup> getGroups() {
150        if (groups.isEmpty()) {
151            getStreams().forEach(name -> {
152                getManager().listConsumerGroups(name)
153                            .forEach(group -> groups.add(new LogPartitionGroup(group, name, 0)));
154            });
155            log.info("Update list of consumers: {}", groups);
156        }
157        return groups;
158    }
159
160    protected LogManager getManager() {
161        // it is ok to cache a service here, hot reload will restart the computation
162        if (manager == null) {
163            manager = Framework.getService(StreamService.class).getLogManager();
164        }
165        return manager;
166    }
167
168    @Override
169    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
170        // this is not supposed to happen
171    }
172
173    public static class LatencyMetric {
174        public static final Object PREFIX = "nuxeo.streams.global.stream.group.";
175
176        protected final LogPartitionGroup consumer;
177
178        protected final MetricRegistry registry;
179
180        protected final MetricName endMetric;
181
182        protected final MetricName posMetric;
183
184        protected final MetricName lagMetric;
185
186        protected final MetricName latMetric;
187
188        protected Latency latency;
189
190        protected boolean registered;
191
192        public LatencyMetric(LogPartitionGroup consumer, MetricRegistry registry) {
193            this.consumer = consumer;
194            this.registry = registry;
195            endMetric = getMetricName("end");
196            posMetric = getMetricName("pos");
197            lagMetric = getMetricName("lag");
198            latMetric = getMetricName("latency");
199        }
200
201        protected MetricName getMetricName(String name) {
202            return MetricName.build(PREFIX + name)
203                             .tagged("stream", consumer.name.getId())
204                             .tagged("group", consumer.group.getId());
205        }
206
207        protected void registerMetrics() {
208            registry.register(endMetric, (Gauge<Long>) () -> latency.lag().upper());
209            registry.register(posMetric, (Gauge<Long>) () -> latency.lag().lower());
210            registry.register(lagMetric, (Gauge<Long>) () -> latency.lag().lag());
211            registry.register(latMetric, (Gauge<Long>) () -> latency.latency());
212        }
213
214        protected void unregisterMetrics() {
215            registry.remove(endMetric);
216            registry.remove(posMetric);
217            registry.remove(lagMetric);
218            registry.remove(latMetric);
219        }
220
221        public boolean update(LogManager manager, Codec<Record> codec) {
222            try {
223                latency = manager.getLatency(consumer.name, consumer.group, codec,
224                        (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey));
225                if (!registered) {
226                    registerMetrics();
227                    registered = true;
228                }
229            } catch (Exception e) {
230                if (e.getCause() instanceof ClassNotFoundException || e.getCause() instanceof ClassCastException
231                        || e instanceof IllegalStateException || e instanceof IllegalArgumentException) {
232                    log.warn("Invalid stream, cannot get latency: " + consumer, e);
233                    return true;
234                }
235                throw e;
236            }
237            return false;
238        }
239
240        public void destroy() {
241            unregisterMetrics();
242        }
243
244        public Name getStream() {
245            return consumer.getLogPartition().name();
246        }
247
248    }
249
250}