001/*
002 * (C) Copyright 2006-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 *
017 * Contributors:
018 *     anechaev
019 */
020package org.nuxeo.runtime.kafka;
021
022import java.util.Properties;
023import java.util.Set;
024import java.util.stream.Collectors;
025
026import org.apache.kafka.clients.consumer.ConsumerConfig;
027import org.apache.kafka.clients.producer.ProducerConfig;
028import org.nuxeo.lib.stream.log.kafka.KafkaUtils;
029import org.nuxeo.runtime.model.DefaultComponent;
030import org.nuxeo.runtime.model.Descriptor;
031
032public class KafkaConfigServiceImpl extends DefaultComponent implements KafkaConfigService {
033
034    public static final String XP_KAFKA_CONFIG = "kafkaConfig";
035
036    public static final int APPLICATION_STARTED_ORDER = -600;
037
038    protected static final String DEFAULT_BOOTSTRAP_SERVERS = "DEFAULT_TEST";
039
040    @Override
041    public int getApplicationStartedOrder() {
042        // since there is no dependencies, let's start before main nuxeo core services
043        return APPLICATION_STARTED_ORDER;
044    }
045
046    @Override
047    public Set<String> listConfigNames() {
048        return getDescriptors(XP_KAFKA_CONFIG).stream()
049                                              .map(Descriptor::getId)
050                                              .collect(Collectors.toSet());
051    }
052
053    @Deprecated
054    @Override
055    public String getZkServers(String configName) {
056        return getDescriptor(configName).zkServers;
057    }
058
059    @Override
060    public Properties getProducerProperties(String configName) {
061        Properties ret = getDescriptor(configName).producerProperties.properties;
062        if (DEFAULT_BOOTSTRAP_SERVERS.equals(ret.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
063            ret.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaUtils.getBootstrapServers());
064        }
065        return ret;
066    }
067
068    @Override
069    public Properties getConsumerProperties(String configName) {
070        Properties ret = getDescriptor(configName).consumerProperties.properties;
071        if (DEFAULT_BOOTSTRAP_SERVERS.equals(ret.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
072            ret.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaUtils.getBootstrapServers());
073        }
074        return ret;
075    }
076
077    @Override
078    public String getTopicPrefix(String configName) {
079        KafkaConfigDescriptor config = getDescriptor(configName);
080        String ret = config.topicPrefix == null ? "" : config.topicPrefix;
081        if (config.randomPrefix) {
082            ret += System.currentTimeMillis() + "-";
083        }
084        return ret;
085    }
086
087    protected KafkaConfigDescriptor getDescriptor(String configName) {
088        KafkaConfigDescriptor descriptor = getDescriptor(XP_KAFKA_CONFIG, configName);
089        if (descriptor == null) {
090            throw new IllegalArgumentException("Unknown configuration name: " + configName);
091        }
092        return descriptor;
093    }
094
095}