001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.io.Externalizable; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.Properties; 027import java.util.stream.Collectors; 028 029import org.apache.kafka.clients.admin.AdminClientConfig; 030import org.apache.kafka.clients.consumer.ConsumerConfig; 031import org.apache.kafka.clients.consumer.KafkaConsumer; 032import org.apache.kafka.clients.consumer.OffsetAndMetadata; 033import org.apache.kafka.clients.producer.ProducerConfig; 034import org.apache.kafka.common.TopicPartition; 035import org.apache.kafka.common.utils.Bytes; 036import org.nuxeo.lib.stream.codec.Codec; 037import org.nuxeo.lib.stream.log.LogLag; 038import org.nuxeo.lib.stream.log.LogPartition; 039import org.nuxeo.lib.stream.log.LogTailer; 040import org.nuxeo.lib.stream.log.RebalanceListener; 041import org.nuxeo.lib.stream.log.internals.AbstractLogManager; 042import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 043 044/** 045 * @since 9.3 046 */ 047public class KafkaLogManager extends AbstractLogManager { 048 public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable"; 049 050 public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor"; 051 052 protected final KafkaUtils kUtils; 053 054 protected final Properties producerProperties; 055 056 protected final Properties consumerProperties; 057 058 protected final Properties adminProperties; 059 060 protected final String prefix; 061 062 protected final short defaultReplicationFactor; 063 064 protected final boolean disableSubscribe; 065 066 protected final KafkaNamespace ns; 067 068 /** 069 * @deprecated since 10.2, zookeeper is not needed anymore, you need to remove the zkServers parameter. 070 */ 071 @Deprecated 072 public KafkaLogManager(String zkServers, String prefix, Properties producerProperties, 073 Properties consumerProperties) { 074 this(prefix, producerProperties, consumerProperties); 075 } 076 077 /** 078 * @since 10.2 079 */ 080 public KafkaLogManager(String prefix, Properties producerProperties, Properties consumerProperties) { 081 this.prefix = (prefix != null) ? prefix : ""; 082 this.ns = new KafkaNamespace(this.prefix); 083 disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false")); 084 defaultReplicationFactor = Short.parseShort( 085 producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1")); 086 this.producerProperties = normalizeProducerProperties(producerProperties); 087 this.consumerProperties = normalizeConsumerProperties(consumerProperties); 088 this.adminProperties = createAdminProperties(producerProperties, consumerProperties); 089 this.kUtils = new KafkaUtils(adminProperties); 090 } 091 092 @Override 093 public void create(String name, int size) { 094 kUtils.createTopic(ns.getTopicName(name), size, defaultReplicationFactor); 095 } 096 097 @Override 098 protected int getSize(String name) { 099 return kUtils.partitions(ns.getTopicName(name)); 100 } 101 102 @Override 103 public boolean exists(String name) { 104 return kUtils.topicExists(ns.getTopicName(name)); 105 } 106 107 @Override 108 public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name, Codec<M> codec) { 109 return KafkaLogAppender.open(codec, ns, name, producerProperties, consumerProperties); 110 } 111 112 @Override 113 protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, String group, 114 Codec<M> codec) { 115 partitions.forEach(this::checkValidPartition); 116 return KafkaLogTailer.createAndAssign(codec, ns, partitions, group, (Properties) consumerProperties.clone()); 117 } 118 119 protected void checkValidPartition(LogPartition partition) { 120 int partitions = kUtils.getNumberOfPartitions(ns.getTopicName(partition.name())); 121 if (partition.partition() >= partitions) { 122 throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions); 123 } 124 } 125 126 public Properties getProducerProperties() { 127 return producerProperties; 128 } 129 130 public Properties getConsumerProperties() { 131 return consumerProperties; 132 } 133 134 public Properties getAdminProperties() { 135 return adminProperties; 136 } 137 138 @Override 139 public void close() { 140 super.close(); 141 if (kUtils != null) { 142 kUtils.close(); 143 } 144 } 145 146 @Override 147 public boolean supportSubscribe() { 148 return !disableSubscribe; 149 } 150 151 @Override 152 protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, 153 RebalanceListener listener, Codec<M> codec) { 154 return KafkaLogTailer.createAndSubscribe(codec, ns, names, group, (Properties) consumerProperties.clone(), 155 listener); 156 } 157 158 protected Properties normalizeProducerProperties(Properties producerProperties) { 159 Properties ret; 160 if (producerProperties != null) { 161 ret = (Properties) producerProperties.clone(); 162 } else { 163 ret = new Properties(); 164 } 165 ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 166 ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer"); 167 ret.remove(DEFAULT_REPLICATION_FACTOR_PROP); 168 return ret; 169 } 170 171 protected Properties normalizeConsumerProperties(Properties consumerProperties) { 172 Properties ret; 173 if (consumerProperties != null) { 174 ret = (Properties) consumerProperties.clone(); 175 } else { 176 ret = new Properties(); 177 } 178 ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 179 "org.apache.kafka.common.serialization.StringDeserializer"); 180 ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 181 "org.apache.kafka.common.serialization.BytesDeserializer"); 182 ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 183 ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 184 ret.remove(DISABLE_SUBSCRIBE_PROP); 185 return ret; 186 } 187 188 protected Properties createAdminProperties(Properties producerProperties, Properties consumerProperties) { 189 Properties ret = new Properties(); 190 ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 191 producerProperties.getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192 consumerProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))); 193 return ret; 194 } 195 196 @Override 197 public synchronized List<LogLag> getLagPerPartition(String name, String group) { 198 Properties props = (Properties) consumerProperties.clone(); 199 props.put(ConsumerConfig.GROUP_ID_CONFIG, prefix + group); 200 props.put(ConsumerConfig.CLIENT_ID_CONFIG, "lag"); 201 try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) { 202 List<TopicPartition> topicPartitions = consumer.partitionsFor(ns.getTopicName(name)) 203 .stream() 204 .map(meta -> new TopicPartition(meta.topic(), 205 meta.partition())) 206 .collect(Collectors.toList()); 207 LogLag[] ret = new LogLag[topicPartitions.size()]; 208 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); 209 for (TopicPartition topicPartition : topicPartitions) { 210 long committedOffset = 0L; 211 OffsetAndMetadata committed = consumer.committed(topicPartition); 212 if (committed != null) { 213 committedOffset = committed.offset(); 214 } 215 Long endOffset = endOffsets.get(topicPartition); 216 if (endOffset == null) { 217 endOffset = 0L; 218 } 219 ret[topicPartition.partition()] = new LogLag(committedOffset, endOffset); 220 } 221 return Arrays.asList(ret); 222 } 223 } 224 225 @Override 226 public List<String> listAll() { 227 return kUtils.listTopics() 228 .stream() 229 .filter(name -> name.startsWith(prefix)) 230 .map(ns::getLogName) 231 .collect(Collectors.toList()); 232 } 233 234 @Override 235 public String toString() { 236 return "KafkaLogManager{" + "producerProperties=" + producerProperties + ", consumerProperties=" 237 + consumerProperties + ", prefix='" + prefix + '\'' + '}'; 238 } 239 240 @Override 241 public List<String> listConsumerGroups(String name) { 242 String topic = ns.getTopicName(name); 243 if (!exists(name)) { 244 throw new IllegalArgumentException("Unknown Log: " + name); 245 } 246 return kUtils.listConsumers(topic) 247 .stream() 248 .filter(group -> group.startsWith(prefix)) 249 .map(ns::getGroup) 250 .collect(Collectors.toList()); 251 } 252 253}