001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 *
017 * Contributors:
018 *     anechaev
019 */
020package org.nuxeo.runtime.kafka;
021
022import java.util.HashMap;
023import java.util.Map;
024import java.util.Properties;
025import java.util.Set;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.apache.kafka.clients.consumer.ConsumerConfig;
030import org.apache.kafka.clients.producer.ProducerConfig;
031import org.nuxeo.lib.stream.log.kafka.KafkaUtils;
032import org.nuxeo.runtime.model.ComponentContext;
033import org.nuxeo.runtime.model.ComponentInstance;
034import org.nuxeo.runtime.model.DefaultComponent;
035
036public class KafkaConfigServiceImpl extends DefaultComponent implements KafkaConfigService {
037    public static final String KAFKA_CONFIG_XP = "kafkaConfig";
038
039    public static final int APPLICATION_STARTED_ORDER = -600;
040
041    private static final Log log = LogFactory.getLog(KafkaConfigServiceImpl.class);
042
043    protected static final String DEFAULT_BOOTSTRAP_SERVERS = "DEFAULT_TEST";
044
045    protected final Map<String, KafkaConfigDescriptor> configs = new HashMap<>();
046
047    @Override
048    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
049        if (extensionPoint.equals(KAFKA_CONFIG_XP)) {
050            KafkaConfigDescriptor descriptor = (KafkaConfigDescriptor) contribution;
051            configs.put(descriptor.name, descriptor);
052            log.info(String.format("Register Kafka contribution: %s", descriptor.name));
053        }
054    }
055
056    @Override
057    public int getApplicationStartedOrder() {
058        // since there is no dependencies, let's start before main nuxeo core services
059        return APPLICATION_STARTED_ORDER;
060    }
061
062    @Override
063    public void deactivate(ComponentContext context) {
064        super.deactivate(context);
065        log.debug("Deactivating service");
066    }
067
068    @Override
069    public void activate(ComponentContext context) {
070        super.activate(context);
071        log.debug("Activating service");
072    }
073
074    @Override
075    public Set<String> listConfigNames() {
076        return configs.keySet();
077    }
078
079    @Deprecated
080    @Override
081    public String getZkServers(String configName) {
082        checkConfigName(configName);
083        return configs.get(configName).zkServers;
084    }
085
086    protected void checkConfigName(String configName) {
087        if (!configs.containsKey(configName)) {
088            throw new IllegalArgumentException("Unknown configuration name: " + configName);
089        }
090    }
091
092    @Override
093    public Properties getProducerProperties(String configName) {
094        checkConfigName(configName);
095        Properties ret = configs.get(configName).getProducerProperties();
096        if (DEFAULT_BOOTSTRAP_SERVERS.equals(ret.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
097            ret.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaUtils.getBootstrapServers());
098        }
099        return ret;
100    }
101
102    @Override
103    public Properties getConsumerProperties(String configName) {
104        checkConfigName(configName);
105        Properties ret = configs.get(configName).getConsumerProperties();
106        if (DEFAULT_BOOTSTRAP_SERVERS.equals(ret.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
107            ret.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaUtils.getBootstrapServers());
108        }
109        return ret;
110    }
111
112    @Override
113    public String getTopicPrefix(String configName) {
114        checkConfigName(configName);
115        return configs.get(configName).getTopicPrefix();
116    }
117}