001/* 002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.runtime.kafka; 020 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.IOException; 024import java.io.InputStream; 025 026import org.apache.logging.log4j.LogManager; 027import org.apache.logging.log4j.Logger; 028import org.nuxeo.common.xmap.XMap; 029import org.nuxeo.launcher.config.ConfigurationException; 030import org.nuxeo.launcher.config.ConfigurationGenerator; 031import org.nuxeo.launcher.config.backingservices.BackingChecker; 032import org.nuxeo.lib.stream.log.Name; 033import org.nuxeo.lib.stream.log.kafka.KafkaLogManager; 034 035/** 036 * @since 11.3 037 */ 038public class KafkaChecker implements BackingChecker { 039 private static final Logger log = LogManager.getLogger(KafkaChecker.class); 040 041 private static final String KAFKA_ENABLED_PROP = "kafka.enabled"; 042 043 private static final String CONFIG_NAME = "kafka-config.xml"; 044 045 @Override 046 public boolean accepts(ConfigurationGenerator cg) { 047 // not using Boolean.parseValue on purpose, only 'true' must trigger the checker 048 if (!"true".equals(cg.getUserConfig().getProperty(KAFKA_ENABLED_PROP))) { 049 log.debug("Checker skipped because Kafka is disabled"); 050 return false; 051 } 052 return true; 053 } 054 055 @Override 056 public void check(ConfigurationGenerator cg) throws ConfigurationException { 057 KafkaConfigDescriptor config = getConfig(cg); 058 try (KafkaLogManager manager = new KafkaLogManager(config.topicPrefix, config.producerProperties.properties, 059 config.consumerProperties.properties)) { 060 manager.exists(Name.ofUrn("input/null")); 061 } catch (Exception e) { 062 throw new ConfigurationException("Unable to reach Kafka using: " + config.producerProperties.properties, e); 063 } 064 } 065 066 protected KafkaConfigDescriptor getConfig(ConfigurationGenerator cg) throws ConfigurationException { 067 File configFile = new File(cg.getConfigDir(), CONFIG_NAME); 068 if (!configFile.exists()) { 069 throw new ConfigurationException("Cannot find Kafka configuration: " + CONFIG_NAME); 070 } 071 XMap xmap = new XMap(); 072 xmap.register(KafkaConfigDescriptor.class); 073 try (InputStream inStream = new FileInputStream(configFile)) { 074 Object[] nodes = xmap.loadAll(inStream); 075 for (Object node : nodes) { 076 if (node != null) { 077 return (KafkaConfigDescriptor) node; 078 } 079 } 080 throw new ConfigurationException("No KafkaConfigDescriptor found in " + configFile.getAbsolutePath()); 081 } catch (IOException e) { 082 throw new ConfigurationException( 083 "Failed to load KafkaConfigDescriptor from " + configFile.getAbsolutePath(), e); 084 } 085 } 086}