001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.io.Externalizable; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.Properties; 027import java.util.stream.Collectors; 028 029import org.apache.kafka.clients.admin.AdminClientConfig; 030import org.apache.kafka.clients.consumer.ConsumerConfig; 031import org.apache.kafka.clients.consumer.KafkaConsumer; 032import org.apache.kafka.clients.consumer.OffsetAndMetadata; 033import org.apache.kafka.clients.producer.ProducerConfig; 034import org.apache.kafka.common.TopicPartition; 035import org.apache.kafka.common.utils.Bytes; 036import org.nuxeo.lib.stream.log.LogLag; 037import org.nuxeo.lib.stream.log.LogPartition; 038import org.nuxeo.lib.stream.log.LogTailer; 039import org.nuxeo.lib.stream.log.RebalanceListener; 040import org.nuxeo.lib.stream.log.internals.AbstractLogManager; 041import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 042 043/** 044 * @since 9.3 045 */ 046public class KafkaLogManager extends AbstractLogManager { 047 public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable"; 048 049 public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor"; 050 051 protected final KafkaUtils kUtils; 052 053 protected final Properties producerProperties; 054 055 protected final Properties consumerProperties; 056 057 protected final Properties adminProperties; 058 059 protected final String prefix; 060 061 protected final short defaultReplicationFactor; 062 063 protected final boolean disableSubscribe; 064 065 public KafkaLogManager(String zkServers, Properties producerProperties, Properties consumerProperties) { 066 this(zkServers, null, producerProperties, consumerProperties); 067 } 068 069 public KafkaLogManager(String zkServers, String topicPrefix, Properties producerProperties, 070 Properties consumerProperties) { 071 this.prefix = (topicPrefix != null) ? topicPrefix : ""; 072 this.kUtils = new KafkaUtils(zkServers); 073 disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false")); 074 defaultReplicationFactor = Short.parseShort( 075 producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1")); 076 this.producerProperties = normalizeProducerProperties(producerProperties); 077 this.consumerProperties = normalizeConsumerProperties(consumerProperties); 078 this.adminProperties = createAdminProperties(producerProperties, consumerProperties); 079 } 080 081 protected String getTopicName(String name) { 082 return prefix + name; 083 } 084 085 protected String getNameFromTopic(String topic) { 086 if (!topic.startsWith(prefix)) { 087 throw new IllegalArgumentException(String.format("topic %s with invalid prefix %s", topic, prefix)); 088 } 089 return topic.substring(prefix.length()); 090 } 091 092 @Override 093 public void create(String name, int size) { 094 kUtils.createTopic(getAdminProperties(), getTopicName(name), size, defaultReplicationFactor); 095 } 096 097 @Override 098 public boolean exists(String name) { 099 return kUtils.topicExists(getTopicName(name)); 100 } 101 102 @Override 103 public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name) { 104 return KafkaLogAppender.open(getTopicName(name), name, producerProperties, consumerProperties); 105 } 106 107 @Override 108 protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, 109 String group) { 110 partitions.forEach(this::checkValidPartition); 111 return KafkaLogTailer.createAndAssign(prefix, partitions, group, (Properties) consumerProperties.clone()); 112 } 113 114 protected void checkValidPartition(LogPartition partition) { 115 int partitions = kUtils.getNumberOfPartitions(getAdminProperties(), getTopicName(partition.name())); 116 if (partition.partition() >= partitions) { 117 throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions); 118 } 119 } 120 121 public Properties getProducerProperties() { 122 return producerProperties; 123 } 124 125 public Properties getConsumerProperties() { 126 return consumerProperties; 127 } 128 129 public Properties getAdminProperties() { 130 return adminProperties; 131 } 132 133 @Override 134 public void close() { 135 super.close(); 136 if (kUtils != null) { 137 kUtils.close(); 138 } 139 } 140 141 @Override 142 public boolean supportSubscribe() { 143 return !disableSubscribe; 144 } 145 146 @Override 147 protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, 148 RebalanceListener listener) { 149 return KafkaLogTailer.createAndSubscribe(prefix, names, group, (Properties) consumerProperties.clone(), 150 listener); 151 } 152 153 protected Properties normalizeProducerProperties(Properties producerProperties) { 154 Properties ret; 155 if (producerProperties != null) { 156 ret = (Properties) producerProperties.clone(); 157 } else { 158 ret = new Properties(); 159 } 160 ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 161 ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer"); 162 ret.remove(DEFAULT_REPLICATION_FACTOR_PROP); 163 return ret; 164 } 165 166 protected Properties normalizeConsumerProperties(Properties consumerProperties) { 167 Properties ret; 168 if (consumerProperties != null) { 169 ret = (Properties) consumerProperties.clone(); 170 } else { 171 ret = new Properties(); 172 } 173 ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 174 "org.apache.kafka.common.serialization.StringDeserializer"); 175 ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 176 "org.apache.kafka.common.serialization.BytesDeserializer"); 177 ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 178 ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 179 ret.remove(DISABLE_SUBSCRIBE_PROP); 180 return ret; 181 } 182 183 protected Properties createAdminProperties(Properties producerProperties, Properties consumerProperties) { 184 Properties ret = new Properties(); 185 ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 186 producerProperties.getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 187 consumerProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))); 188 return ret; 189 } 190 191 @Override 192 public List<LogLag> getLagPerPartition(String name, String group) { 193 Properties props = (Properties) consumerProperties.clone(); 194 props.put(ConsumerConfig.GROUP_ID_CONFIG, group); 195 try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) { 196 List<TopicPartition> topicPartitions = consumer.partitionsFor(getTopicName(name)) 197 .stream() 198 .map(meta -> new TopicPartition(meta.topic(), 199 meta.partition())) 200 .collect(Collectors.toList()); 201 LogLag[] ret = new LogLag[topicPartitions.size()]; 202 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); 203 for (TopicPartition topicPartition : topicPartitions) { 204 long committedOffset = 0L; 205 OffsetAndMetadata committed = consumer.committed(topicPartition); 206 if (committed != null) { 207 committedOffset = committed.offset(); 208 } 209 Long endOffset = endOffsets.get(topicPartition); 210 if (endOffset == null) { 211 endOffset = 0L; 212 } 213 ret[topicPartition.partition()] = new LogLag(committedOffset, endOffset); 214 } 215 return Arrays.asList(ret); 216 } 217 } 218 219 @Override 220 public List<String> listAll() { 221 return kUtils.listTopics().stream().filter(name -> name.startsWith(prefix)).map(this::getNameFromTopic).collect( 222 Collectors.toList()); 223 } 224 225 @Override 226 public String toString() { 227 return "KafkaLogManager{" + "producerProperties=" + producerProperties + ", consumerProperties=" 228 + consumerProperties + ", prefix='" + prefix + '\'' + '}'; 229 } 230 231 @Override 232 public List<String> listConsumerGroups(String name) { 233 String topic = getTopicName(name); 234 if (!exists(name)) { 235 throw new IllegalArgumentException("Unknown Log: " + name); 236 } 237 return kUtils.listConsumers(getProducerProperties(), topic); 238 } 239 240}