001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools;
020
021import java.io.IOException;
022import java.nio.file.Path;
023import java.util.Properties;
024
025import javax.xml.parsers.DocumentBuilder;
026import javax.xml.parsers.DocumentBuilderFactory;
027import javax.xml.parsers.ParserConfigurationException;
028
029import org.apache.kafka.clients.producer.ProducerConfig;
030import org.nuxeo.lib.stream.log.kafka.KafkaUtils;
031import org.w3c.dom.Document;
032import org.w3c.dom.Node;
033import org.w3c.dom.NodeList;
034import org.xml.sax.SAXException;
035
036/**
037 * Parse an xml file describing Kafka configurations, the format is the one used by KafkaConfigDescriptor. We can not
038 * use the Nuxeo descriptor directly because we are in a library without Nuxeo dependency.
039 *
040 * @since 9.10
041 */
042public class KafkaConfigParser {
043
044    protected static final String DEFAULT_BOOTSTRAP_SERVERS = "DEFAULT_TEST";
045
046    protected Properties producerProperties;
047
048    protected Properties consumerProperties;
049
050    protected String prefix;
051
052    public KafkaConfigParser(Path path, String configName) {
053        try {
054            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
055            DocumentBuilder builder = factory.newDocumentBuilder();
056            Document document = builder.parse(path.toFile());
057            NodeList nodes = document.getElementsByTagName("kafkaConfig");
058            for (int i = 0; i < nodes.getLength(); i++) {
059                Node node = nodes.item(i);
060                String name = node.getAttributes().getNamedItem("name").getNodeValue();
061                if (configName.equals(name)) {
062                    parseConfig(node);
063                    return;
064                }
065            }
066        } catch (SAXException | IOException | ParserConfigurationException e) {
067            throw new IllegalArgumentException("Invalid Kafka config file: " + path, e);
068        }
069        throw new IllegalArgumentException(String.format("Config: %s not found in file: %s", configName, path));
070    }
071
072    protected void parseConfig(Node node) {
073        prefix = node.getAttributes().getNamedItem("topicPrefix").getNodeValue();
074        NodeList children = node.getChildNodes();
075        for (int i = 0; i < children.getLength(); i++) {
076            Node child = children.item(i);
077            if ("producer".equals(child.getNodeName())) {
078                producerProperties = decodeProperties(child);
079            } else if ("consumer".equals(child.getNodeName())) {
080                consumerProperties = decodeProperties(child);
081            }
082        }
083    }
084
085    protected Properties decodeProperties(Node node) {
086        NodeList children = node.getChildNodes();
087        Properties ret = new Properties();
088        for (int i = 0; i < children.getLength(); i++) {
089            Node child = children.item(i);
090            if ("property".equals(child.getNodeName())) {
091                String name = child.getAttributes().getNamedItem("name").getNodeValue();
092                String value = child.getTextContent();
093                if (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG.equals(name) && DEFAULT_BOOTSTRAP_SERVERS.equals(value)) {
094                    ret.put(name, KafkaUtils.getBootstrapServers());
095                } else {
096                    ret.put(name, value);
097                }
098            }
099        }
100        return ret;
101    }
102
103    public Properties getProducerProperties() {
104        return producerProperties;
105    }
106
107    public Properties getConsumerProperties() {
108        return consumerProperties;
109    }
110
111    public String getPrefix() {
112        return prefix;
113    }
114}