001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.util.List;
022import java.util.Map;
023import java.util.Properties;
024
025import org.apache.kafka.clients.consumer.ConsumerConfig;
026import org.apache.kafka.clients.producer.ProducerConfig;
027import org.nuxeo.lib.stream.log.AbstractLogConfig;
028import org.nuxeo.lib.stream.log.NameResolver;
029
030/**
031 * @since 11.1
032 */
033public class KafkaLogConfig extends AbstractLogConfig {
034
035    public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable";
036
037    public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor";
038
039    protected final Properties adminProperties;
040
041    protected final Properties producerProperties;
042
043    protected final Properties consumerProperties;
044
045    protected final short defaultReplicationFactor;
046
047    protected final Boolean disableSubscribe;
048
049    protected final NameResolver resolver;
050
051    protected final String name;
052
053    public KafkaLogConfig(String name, boolean defaultConfig, List<String> patterns, String prefix,
054            Properties adminProperties,
055            Properties producerProperties, Properties consumerProperties) {
056        super(defaultConfig, patterns);
057        this.name = name;
058        resolver = new NameResolver(prefix);
059        this.producerProperties = normalizeProducerProperties(producerProperties);
060        this.consumerProperties = normalizeConsumerProperties(consumerProperties);
061        if (adminProperties == null || adminProperties.isEmpty()) {
062            this.adminProperties = createAdminProperties(this.producerProperties);
063        } else {
064            this.adminProperties = normalizeAdminProperties(adminProperties);
065        }
066        disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false"));
067        defaultReplicationFactor = Short.parseShort(
068                producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1"));
069    }
070
071    public short getReplicatorFactor() {
072        return defaultReplicationFactor;
073    }
074
075    public Boolean getDisableSubscribe() {
076        return disableSubscribe;
077    }
078
079    public NameResolver getResolver() {
080        return resolver;
081    }
082
083    public Properties getAdminProperties() {
084        return adminProperties;
085    }
086
087    public Properties getProducerProperties() {
088        return producerProperties;
089    }
090
091    public Properties getConsumerProperties() {
092        return consumerProperties;
093    }
094
095    protected Properties normalizeAdminProperties(Properties adminProperties) {
096        // anything to remove?
097        return (Properties) adminProperties.clone();
098    }
099
100    protected Properties normalizeProducerProperties(Properties producerProperties) {
101        Properties ret;
102        if (producerProperties != null) {
103            ret = (Properties) producerProperties.clone();
104        } else {
105            ret = new Properties();
106        }
107        try {
108            ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
109                    Class.forName("org.apache.kafka.common.serialization.StringSerializer"));
110            ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
111                    Class.forName("org.apache.kafka.common.serialization.BytesSerializer"));
112        } catch (ClassNotFoundException e) {
113            throw new IllegalStateException(e);
114        }
115        ret.remove(DEFAULT_REPLICATION_FACTOR_PROP);
116        return ret;
117    }
118
119    protected Properties normalizeConsumerProperties(Properties consumerProperties) {
120        Properties ret;
121        if (consumerProperties != null) {
122            ret = (Properties) consumerProperties.clone();
123        } else {
124            ret = new Properties();
125        }
126        try {
127            ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
128                    Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
129            ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
130                    Class.forName("org.apache.kafka.common.serialization.BytesDeserializer"));
131        } catch (ClassNotFoundException e) {
132            throw new IllegalStateException(e);
133        }
134        ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
135        ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
136        ret.remove(DISABLE_SUBSCRIBE_PROP);
137        return ret;
138    }
139
140    protected Properties createAdminProperties(Properties producerProperties) {
141        Properties ret = new Properties();
142        for (Map.Entry<Object, Object> prop : producerProperties.entrySet()) {
143            switch (prop.getKey().toString()) {
144            case ProducerConfig.ACKS_CONFIG:
145            case ProducerConfig.BATCH_SIZE_CONFIG:
146            case ProducerConfig.BUFFER_MEMORY_CONFIG:
147            case ProducerConfig.COMPRESSION_TYPE_CONFIG:
148            case ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG:
149            case ProducerConfig.LINGER_MS_CONFIG:
150            case ProducerConfig.MAX_BLOCK_MS_CONFIG:
151            case ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG:
152            case ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG:
153            case DEFAULT_REPLICATION_FACTOR_PROP:
154                // Skip non admin config properties to avoid warning on unused properties
155                break;
156            default:
157                ret.put(prop.getKey(), prop.getValue());
158            }
159        }
160        return ret;
161    }
162
163    @Override
164    public String toString() {
165        return "KafkaLogConfig{name='" + name + "', resolver='" + resolver + '\'' + ", adminProperties="
166                + filterDisplayedProperties(adminProperties) + ", producerProperties="
167                + filterDisplayedProperties(producerProperties) + ", consumerProperties="
168                + filterDisplayedProperties(consumerProperties) + ", defaultReplicationFactor="
169                + defaultReplicationFactor + ", disableSubscribe=" + disableSubscribe + '}';
170    }
171
172    protected String filterDisplayedProperties(Properties properties) {
173        String ret = properties.toString();
174        if (!ret.contains("password")) {
175            return ret;
176        }
177        return ret.replaceAll("password=.[^\\\"\\;\\,\\ ]*", "password=****");
178    }
179}