001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools; 020 021import java.io.IOException; 022import java.nio.file.Path; 023import java.util.Properties; 024 025import javax.xml.parsers.DocumentBuilder; 026import javax.xml.parsers.DocumentBuilderFactory; 027import javax.xml.parsers.ParserConfigurationException; 028 029import org.apache.kafka.clients.producer.ProducerConfig; 030import org.nuxeo.lib.stream.log.kafka.KafkaUtils; 031import org.w3c.dom.Document; 032import org.w3c.dom.Node; 033import org.w3c.dom.NodeList; 034import org.xml.sax.SAXException; 035 036/** 037 * Parse an xml file describing Kafka configurations, the format is the one used by KafkaConfigDescriptor. We can not 038 * use the Nuxeo descriptor directly because we are in a library without Nuxeo dependency. 039 * 040 * @since 9.10 041 */ 042public class KafkaConfigParser { 043 044 protected static final String DEFAULT_BOOTSTRAP_SERVERS = "DEFAULT_TEST"; 045 046 protected Properties producerProperties; 047 048 protected Properties consumerProperties; 049 050 protected String prefix; 051 052 public KafkaConfigParser(Path path, String configName) { 053 try { 054 DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); 055 DocumentBuilder builder = factory.newDocumentBuilder(); 056 Document document = builder.parse(path.toFile()); 057 NodeList nodes = document.getElementsByTagName("kafkaConfig"); 058 for (int i = 0; i < nodes.getLength(); i++) { 059 Node node = nodes.item(i); 060 String name = node.getAttributes().getNamedItem("name").getNodeValue(); 061 if (configName.equals(name)) { 062 parseConfig(node); 063 return; 064 } 065 } 066 } catch (SAXException | IOException | ParserConfigurationException e) { 067 throw new IllegalArgumentException("Invalid Kafka config file: " + path, e); 068 } 069 throw new IllegalArgumentException(String.format("Config: %s not found in file: %s", configName, path)); 070 } 071 072 protected void parseConfig(Node node) { 073 prefix = node.getAttributes().getNamedItem("topicPrefix").getNodeValue(); 074 NodeList children = node.getChildNodes(); 075 for (int i = 0; i < children.getLength(); i++) { 076 Node child = children.item(i); 077 if ("producer".equals(child.getNodeName())) { 078 producerProperties = decodeProperties(child); 079 } else if ("consumer".equals(child.getNodeName())) { 080 consumerProperties = decodeProperties(child); 081 } 082 } 083 } 084 085 protected Properties decodeProperties(Node node) { 086 NodeList children = node.getChildNodes(); 087 Properties ret = new Properties(); 088 for (int i = 0; i < children.getLength(); i++) { 089 Node child = children.item(i); 090 if ("property".equals(child.getNodeName())) { 091 String name = child.getAttributes().getNamedItem("name").getNodeValue(); 092 String value = child.getTextContent(); 093 if (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG.equals(name) && DEFAULT_BOOTSTRAP_SERVERS.equals(value)) { 094 ret.put(name, KafkaUtils.getBootstrapServers()); 095 } else { 096 ret.put(name, value); 097 } 098 } 099 } 100 return ret; 101 } 102 103 public Properties getProducerProperties() { 104 return producerProperties; 105 } 106 107 public Properties getConsumerProperties() { 108 return consumerProperties; 109 } 110 111 public String getPrefix() { 112 return prefix; 113 } 114}