001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools; 020 021import java.io.IOException; 022import java.nio.file.Path; 023import java.util.Properties; 024 025import javax.xml.parsers.DocumentBuilder; 026import javax.xml.parsers.DocumentBuilderFactory; 027import javax.xml.parsers.ParserConfigurationException; 028 029import org.w3c.dom.Document; 030import org.w3c.dom.Node; 031import org.w3c.dom.NodeList; 032import org.xml.sax.SAXException; 033 034/** 035 * Parse an xml file describing Kafka configurations, the format is the one used by KafkaConfigDescriptor. We can not 036 * use the Nuxeo descriptor directly because we are in a library without Nuxeo dependency. 037 * 038 * @since 9.10 039 */ 040public class KafkaConfigParser { 041 042 protected String zkServers; 043 044 private Properties producerProperties; 045 046 private Properties consumerProperties; 047 048 private String prefix; 049 050 public KafkaConfigParser(Path path, String configName) { 051 try { 052 DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); 053 DocumentBuilder builder = factory.newDocumentBuilder(); 054 Document document = builder.parse(path.toFile()); 055 NodeList nodes = document.getElementsByTagName("kafkaConfig"); 056 for (int i = 0; i < nodes.getLength(); i++) { 057 Node node = nodes.item(i); 058 String name = node.getAttributes().getNamedItem("name").getNodeValue(); 059 if (configName.equals(name)) { 060 parseConfig(node); 061 return; 062 } 063 } 064 } catch (SAXException | IOException | ParserConfigurationException e) { 065 throw new IllegalArgumentException("Invalid Kafka config file: " + path, e); 066 } 067 throw new IllegalArgumentException(String.format("Config: %s not found in file: %s", configName, path)); 068 } 069 070 protected void parseConfig(Node node) { 071 prefix = node.getAttributes().getNamedItem("topicPrefix").getNodeValue(); 072 zkServers = node.getAttributes().getNamedItem("zkServers").getNodeValue(); 073 NodeList children = node.getChildNodes(); 074 for (int i = 0; i < children.getLength(); i++) { 075 Node child = children.item(i); 076 if ("producer".equals(child.getNodeName())) { 077 producerProperties = decodeProperties(child); 078 } else if ("consumer".equals(child.getNodeName())) { 079 consumerProperties = decodeProperties(child); 080 } 081 } 082 } 083 084 protected Properties decodeProperties(Node node) { 085 NodeList children = node.getChildNodes(); 086 Properties ret = new Properties(); 087 for (int i = 0; i < children.getLength(); i++) { 088 Node child = children.item(i); 089 if ("property".equals(child.getNodeName())) { 090 ret.put(child.getAttributes().getNamedItem("name").getNodeValue(), child.getTextContent()); 091 } 092 } 093 return ret; 094 } 095 096 public String getZkServers() { 097 return zkServers; 098 } 099 100 public Properties getProducerProperties() { 101 return producerProperties; 102 } 103 104 public Properties getConsumerProperties() { 105 return consumerProperties; 106 } 107 108 public String getPrefix() { 109 return prefix; 110 } 111}