001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.stream;
020
021import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1;
022import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_NULL;
023
024import java.time.Duration;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.stream.Collectors;
030
031import org.nuxeo.lib.stream.computation.Topology;
032import org.nuxeo.runtime.api.Framework;
033import org.nuxeo.runtime.services.config.ConfigurationService;
034
035/**
036 * A processor fetching stream metrics populating Dropwizard metrics.
037 *
038 * @since 11.1
039 */
040public class StreamMetricsProcessor implements StreamProcessorTopology {
041
042    protected static final Duration DEFAULT_INTERVAL = Duration.ofMinutes(1);
043
044    protected static final String STREAM_METRICS_FETCH_INTERVAL = "metrics.streams.interval";
045
046    protected static final String STREAM_METRICS_LIST = "metrics.streams.list";
047
048    @Override
049    public Topology getTopology(Map<String, String> options) {
050        ConfigurationService confService = Framework.getService(ConfigurationService.class);
051        Duration interval = confService.getDuration(STREAM_METRICS_FETCH_INTERVAL, DEFAULT_INTERVAL);
052        String streams = confService.getString(STREAM_METRICS_LIST, null);
053        List<String> inputStreams = parseInputStreams(streams);
054        return Topology.builder()
055                       .addComputation(() -> new StreamMetricsComputation(interval, inputStreams),
056                               Collections.singletonList(INPUT_1 + ":" + INPUT_NULL))
057                       .build();
058    }
059
060    protected List<String> parseInputStreams(String streams) {
061        if (streams == null || streams.isBlank()) {
062            return null;
063        }
064        return Arrays.stream(streams.split(",")).map(String::trim).collect(Collectors.toList());
065    }
066}