001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools;
020
021import java.io.IOException;
022import java.nio.file.Path;
023import java.util.Properties;
024
025import javax.xml.parsers.DocumentBuilder;
026import javax.xml.parsers.DocumentBuilderFactory;
027import javax.xml.parsers.ParserConfigurationException;
028
029import org.apache.kafka.clients.producer.ProducerConfig;
030import org.nuxeo.lib.stream.log.kafka.KafkaUtils;
031import org.w3c.dom.Document;
032import org.w3c.dom.Node;
033import org.w3c.dom.NodeList;
034import org.xml.sax.SAXException;
035
036/**
037 * Parse an xml file describing Kafka configurations, the format is the one used by KafkaConfigDescriptor. We can not
038 * use the Nuxeo descriptor directly because we are in a library without Nuxeo dependency.
039 *
040 * @since 9.10
041 */
042public class KafkaConfigParser {
043    protected static final String DEFAULT_ZK_SERVERS = "DEFAULT_TEST";
044
045    protected static final String DEFAULT_BOOTSTRAP_SERVERS = "DEFAULT_TEST";
046
047    protected String zkServers;
048
049    private Properties producerProperties;
050
051    private Properties consumerProperties;
052
053    private String prefix;
054
055    public KafkaConfigParser(Path path, String configName) {
056        try {
057            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
058            DocumentBuilder builder = factory.newDocumentBuilder();
059            Document document = builder.parse(path.toFile());
060            NodeList nodes = document.getElementsByTagName("kafkaConfig");
061            for (int i = 0; i < nodes.getLength(); i++) {
062                Node node = nodes.item(i);
063                String name = node.getAttributes().getNamedItem("name").getNodeValue();
064                if (configName.equals(name)) {
065                    parseConfig(node);
066                    return;
067                }
068            }
069        } catch (SAXException | IOException | ParserConfigurationException e) {
070            throw new IllegalArgumentException("Invalid Kafka config file: " + path, e);
071        }
072        throw new IllegalArgumentException(String.format("Config: %s not found in file: %s", configName, path));
073    }
074
075    protected void parseConfig(Node node) {
076        prefix = node.getAttributes().getNamedItem("topicPrefix").getNodeValue();
077        setZkServers(node.getAttributes().getNamedItem("zkServers").getNodeValue());
078        NodeList children = node.getChildNodes();
079        for (int i = 0; i < children.getLength(); i++) {
080            Node child = children.item(i);
081            if ("producer".equals(child.getNodeName())) {
082                producerProperties = decodeProperties(child);
083            } else if ("consumer".equals(child.getNodeName())) {
084                consumerProperties = decodeProperties(child);
085            }
086        }
087    }
088
089    protected Properties decodeProperties(Node node) {
090        NodeList children = node.getChildNodes();
091        Properties ret = new Properties();
092        for (int i = 0; i < children.getLength(); i++) {
093            Node child = children.item(i);
094            if ("property".equals(child.getNodeName())) {
095                String name = child.getAttributes().getNamedItem("name").getNodeValue();
096                String value = child.getTextContent();
097                if (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG.equals(name) && DEFAULT_BOOTSTRAP_SERVERS.equals(value)) {
098                    ret.put(name, KafkaUtils.getBootstrapServers());
099                } else {
100                    ret.put(name, value);
101                }
102            }
103        }
104        return ret;
105    }
106
107    public String getZkServers() {
108        return zkServers;
109    }
110
111    public Properties getProducerProperties() {
112        return producerProperties;
113    }
114
115    public Properties getConsumerProperties() {
116        return consumerProperties;
117    }
118
119    public String getPrefix() {
120        return prefix;
121    }
122
123    public void setZkServers(String zkServers) {
124        if (DEFAULT_ZK_SERVERS.equals(zkServers)) {
125            this.zkServers = KafkaUtils.getZkServers();
126        } else {
127            this.zkServers = zkServers;
128        }
129    }
130}