001/* 002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.util.List; 022import java.util.Map; 023import java.util.Properties; 024 025import org.apache.kafka.clients.consumer.ConsumerConfig; 026import org.apache.kafka.clients.producer.ProducerConfig; 027import org.nuxeo.lib.stream.log.AbstractLogConfig; 028import org.nuxeo.lib.stream.log.NameResolver; 029 030/** 031 * @since 11.1 032 */ 033public class KafkaLogConfig extends AbstractLogConfig { 034 035 public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable"; 036 037 public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor"; 038 039 protected final Properties adminProperties; 040 041 protected final Properties producerProperties; 042 043 protected final Properties consumerProperties; 044 045 protected final short defaultReplicationFactor; 046 047 protected final Boolean disableSubscribe; 048 049 protected final NameResolver resolver; 050 051 protected final String name; 052 053 public KafkaLogConfig(String name, boolean defaultConfig, List<String> patterns, String prefix, 054 Properties adminProperties, 055 Properties producerProperties, Properties consumerProperties) { 056 super(defaultConfig, patterns); 057 this.name = name; 058 resolver = new NameResolver(prefix); 059 this.producerProperties = normalizeProducerProperties(producerProperties); 060 this.consumerProperties = normalizeConsumerProperties(consumerProperties); 061 if (adminProperties == null || adminProperties.isEmpty()) { 062 this.adminProperties = createAdminProperties(this.producerProperties); 063 } else { 064 this.adminProperties = normalizeAdminProperties(adminProperties); 065 } 066 disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false")); 067 defaultReplicationFactor = Short.parseShort( 068 producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1")); 069 } 070 071 public short getReplicatorFactor() { 072 return defaultReplicationFactor; 073 } 074 075 public Boolean getDisableSubscribe() { 076 return disableSubscribe; 077 } 078 079 public NameResolver getResolver() { 080 return resolver; 081 } 082 083 public Properties getAdminProperties() { 084 return adminProperties; 085 } 086 087 public Properties getProducerProperties() { 088 return producerProperties; 089 } 090 091 public Properties getConsumerProperties() { 092 return consumerProperties; 093 } 094 095 protected Properties normalizeAdminProperties(Properties adminProperties) { 096 // anything to remove? 097 return (Properties) adminProperties.clone(); 098 } 099 100 protected Properties normalizeProducerProperties(Properties producerProperties) { 101 Properties ret; 102 if (producerProperties != null) { 103 ret = (Properties) producerProperties.clone(); 104 } else { 105 ret = new Properties(); 106 } 107 try { 108 ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 109 Class.forName("org.apache.kafka.common.serialization.StringSerializer")); 110 ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 111 Class.forName("org.apache.kafka.common.serialization.BytesSerializer")); 112 } catch (ClassNotFoundException e) { 113 throw new IllegalStateException(e); 114 } 115 ret.remove(DEFAULT_REPLICATION_FACTOR_PROP); 116 return ret; 117 } 118 119 protected Properties normalizeConsumerProperties(Properties consumerProperties) { 120 Properties ret; 121 if (consumerProperties != null) { 122 ret = (Properties) consumerProperties.clone(); 123 } else { 124 ret = new Properties(); 125 } 126 try { 127 ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 128 Class.forName("org.apache.kafka.common.serialization.StringDeserializer")); 129 ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 130 Class.forName("org.apache.kafka.common.serialization.BytesDeserializer")); 131 } catch (ClassNotFoundException e) { 132 throw new IllegalStateException(e); 133 } 134 ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 135 ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 136 ret.remove(DISABLE_SUBSCRIBE_PROP); 137 return ret; 138 } 139 140 protected Properties createAdminProperties(Properties producerProperties) { 141 Properties ret = new Properties(); 142 for (Map.Entry<Object, Object> prop : producerProperties.entrySet()) { 143 switch (prop.getKey().toString()) { 144 case ProducerConfig.ACKS_CONFIG: 145 case ProducerConfig.BATCH_SIZE_CONFIG: 146 case ProducerConfig.BUFFER_MEMORY_CONFIG: 147 case ProducerConfig.COMPRESSION_TYPE_CONFIG: 148 case ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG: 149 case ProducerConfig.LINGER_MS_CONFIG: 150 case ProducerConfig.MAX_BLOCK_MS_CONFIG: 151 case ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG: 152 case ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG: 153 case DEFAULT_REPLICATION_FACTOR_PROP: 154 // Skip non admin config properties to avoid warning on unused properties 155 break; 156 default: 157 ret.put(prop.getKey(), prop.getValue()); 158 } 159 } 160 return ret; 161 } 162 163 @Override 164 public String toString() { 165 return "KafkaLogConfig{name='" + name + "', resolver='" + resolver + '\'' + ", adminProperties=" 166 + filterDisplayedProperties(adminProperties) + ", producerProperties=" 167 + filterDisplayedProperties(producerProperties) + ", consumerProperties=" 168 + filterDisplayedProperties(consumerProperties) + ", defaultReplicationFactor=" 169 + defaultReplicationFactor + ", disableSubscribe=" + disableSubscribe + '}'; 170 } 171 172 protected String filterDisplayedProperties(Properties properties) { 173 String ret = properties.toString(); 174 if (!ret.contains("password")) { 175 return ret; 176 } 177 return ret.replaceAll("password=.[^\\\"\\;\\,\\ ]*", "password=****"); 178 } 179}