001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools;
020
021import java.io.IOException;
022import java.nio.file.Path;
023import java.util.Properties;
024
025import javax.xml.parsers.DocumentBuilder;
026import javax.xml.parsers.DocumentBuilderFactory;
027import javax.xml.parsers.ParserConfigurationException;
028
029import org.w3c.dom.Document;
030import org.w3c.dom.Node;
031import org.w3c.dom.NodeList;
032import org.xml.sax.SAXException;
033
034/**
035 * Parse an xml file describing Kafka configurations, the format is the one used by KafkaConfigDescriptor. We can not
036 * use the Nuxeo descriptor directly because we are in a library without Nuxeo dependency.
037 *
038 * @since 9.10
039 */
040public class KafkaConfigParser {
041
042    protected String zkServers;
043
044    private Properties producerProperties;
045
046    private Properties consumerProperties;
047
048    private String prefix;
049
050    public KafkaConfigParser(Path path, String configName) {
051        try {
052            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
053            DocumentBuilder builder = factory.newDocumentBuilder();
054            Document document = builder.parse(path.toFile());
055            NodeList nodes = document.getElementsByTagName("kafkaConfig");
056            for (int i = 0; i < nodes.getLength(); i++) {
057                Node node = nodes.item(i);
058                String name = node.getAttributes().getNamedItem("name").getNodeValue();
059                if (configName.equals(name)) {
060                    parseConfig(node);
061                    return;
062                }
063            }
064        } catch (SAXException | IOException | ParserConfigurationException e) {
065            throw new IllegalArgumentException("Invalid Kafka config file: " + path, e);
066        }
067        throw new IllegalArgumentException(String.format("Config: %s not found in file: %s", configName, path));
068    }
069
070    protected void parseConfig(Node node) {
071        prefix = node.getAttributes().getNamedItem("topicPrefix").getNodeValue();
072        zkServers = node.getAttributes().getNamedItem("zkServers").getNodeValue();
073        NodeList children = node.getChildNodes();
074        for (int i = 0; i < children.getLength(); i++) {
075            Node child = children.item(i);
076            if ("producer".equals(child.getNodeName())) {
077                producerProperties = decodeProperties(child);
078            } else if ("consumer".equals(child.getNodeName())) {
079                consumerProperties = decodeProperties(child);
080            }
081        }
082    }
083
084    protected Properties decodeProperties(Node node) {
085        NodeList children = node.getChildNodes();
086        Properties ret = new Properties();
087        for (int i = 0; i < children.getLength(); i++) {
088            Node child = children.item(i);
089            if ("property".equals(child.getNodeName())) {
090                ret.put(child.getAttributes().getNamedItem("name").getNodeValue(), child.getTextContent());
091            }
092        }
093        return ret;
094    }
095
096    public String getZkServers() {
097        return zkServers;
098    }
099
100    public Properties getProducerProperties() {
101        return producerProperties;
102    }
103
104    public Properties getConsumerProperties() {
105        return consumerProperties;
106    }
107
108    public String getPrefix() {
109        return prefix;
110    }
111}