001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.runtime.kafka;
020
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.IOException;
024import java.io.InputStream;
025
026import org.apache.logging.log4j.LogManager;
027import org.apache.logging.log4j.Logger;
028import org.nuxeo.common.xmap.XMap;
029import org.nuxeo.launcher.config.ConfigurationException;
030import org.nuxeo.launcher.config.ConfigurationGenerator;
031import org.nuxeo.launcher.config.backingservices.BackingChecker;
032import org.nuxeo.lib.stream.log.Name;
033import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
034
035/**
036 * @since 11.3
037 */
038public class KafkaChecker implements BackingChecker {
039    private static final Logger log = LogManager.getLogger(KafkaChecker.class);
040
041    private static final String KAFKA_ENABLED_PROP = "kafka.enabled";
042
043    private static final String CONFIG_NAME = "kafka-config.xml";
044
045    @Override
046    public boolean accepts(ConfigurationGenerator cg) {
047        // not using Boolean.parseValue on purpose, only 'true' must trigger the checker
048        if (!"true".equals(cg.getUserConfig().getProperty(KAFKA_ENABLED_PROP))) {
049            log.debug("Checker skipped because Kafka is disabled");
050            return false;
051        }
052        return true;
053    }
054
055    @Override
056    public void check(ConfigurationGenerator cg) throws ConfigurationException {
057        KafkaConfigDescriptor config = getConfig(cg);
058        try (KafkaLogManager manager = new KafkaLogManager(config.topicPrefix, config.producerProperties.properties,
059                config.consumerProperties.properties)) {
060            manager.exists(Name.ofUrn("input/null"));
061        } catch (Exception e) {
062            throw new ConfigurationException("Unable to reach Kafka using: " + config.producerProperties.properties, e);
063        }
064    }
065
066    protected KafkaConfigDescriptor getConfig(ConfigurationGenerator cg) throws ConfigurationException {
067        File configFile = new File(cg.getConfigDir(), CONFIG_NAME);
068        if (!configFile.exists()) {
069            throw new ConfigurationException("Cannot find Kafka configuration: " + CONFIG_NAME);
070        }
071        XMap xmap = new XMap();
072        xmap.register(KafkaConfigDescriptor.class);
073        try (InputStream inStream = new FileInputStream(configFile)) {
074            Object[] nodes = xmap.loadAll(inStream);
075            for (Object node : nodes) {
076                if (node != null) {
077                    return (KafkaConfigDescriptor) node;
078                }
079            }
080            throw new ConfigurationException("No KafkaConfigDescriptor found in " + configFile.getAbsolutePath());
081        } catch (IOException e) {
082            throw new ConfigurationException(
083                    "Failed to load KafkaConfigDescriptor from " + configFile.getAbsolutePath(), e);
084        }
085    }
086}