001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.io.Externalizable; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.Properties; 027import java.util.stream.Collectors; 028 029import org.apache.kafka.clients.admin.AdminClientConfig; 030import org.apache.kafka.clients.consumer.ConsumerConfig; 031import org.apache.kafka.clients.consumer.KafkaConsumer; 032import org.apache.kafka.clients.consumer.OffsetAndMetadata; 033import org.apache.kafka.clients.producer.ProducerConfig; 034import org.apache.kafka.common.TopicPartition; 035import org.apache.kafka.common.utils.Bytes; 036import org.nuxeo.lib.stream.log.LogLag; 037import org.nuxeo.lib.stream.log.LogPartition; 038import org.nuxeo.lib.stream.log.LogTailer; 039import org.nuxeo.lib.stream.log.RebalanceListener; 040import org.nuxeo.lib.stream.log.internals.AbstractLogManager; 041import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 042 043/** 044 * @since 9.3 045 */ 046public class KafkaLogManager extends AbstractLogManager { 047 public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable"; 048 049 public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor"; 050 051 protected final KafkaUtils kUtils; 052 053 protected final Properties producerProperties; 054 055 protected final Properties consumerProperties; 056 057 protected final Properties adminProperties; 058 059 protected final String prefix; 060 061 protected final short defaultReplicationFactor; 062 063 protected final boolean disableSubscribe; 064 065 protected final KafkaNamespace ns; 066 067 public KafkaLogManager(String zkServers, Properties producerProperties, Properties consumerProperties) { 068 this(zkServers, null, producerProperties, consumerProperties); 069 } 070 071 public KafkaLogManager(String zkServers, String prefix, Properties producerProperties, 072 Properties consumerProperties) { 073 this.prefix = (prefix != null) ? prefix : ""; 074 this.ns = new KafkaNamespace(this.prefix); 075 disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false")); 076 defaultReplicationFactor = Short.parseShort( 077 producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1")); 078 this.producerProperties = normalizeProducerProperties(producerProperties); 079 this.consumerProperties = normalizeConsumerProperties(consumerProperties); 080 this.adminProperties = createAdminProperties(producerProperties, consumerProperties); 081 this.kUtils = new KafkaUtils(zkServers, adminProperties); 082 } 083 084 @Override 085 public void create(String name, int size) { 086 kUtils.createTopic(ns.getTopicName(name), size, defaultReplicationFactor); 087 } 088 089 @Override 090 public boolean exists(String name) { 091 return kUtils.topicExists(ns.getTopicName(name)); 092 } 093 094 @Override 095 public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name) { 096 return KafkaLogAppender.open(ns, name, producerProperties, consumerProperties); 097 } 098 099 @Override 100 protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, 101 String group) { 102 partitions.forEach(this::checkValidPartition); 103 return KafkaLogTailer.createAndAssign(ns, partitions, group, (Properties) consumerProperties.clone()); 104 } 105 106 protected void checkValidPartition(LogPartition partition) { 107 int partitions = kUtils.getNumberOfPartitions(ns.getTopicName(partition.name())); 108 if (partition.partition() >= partitions) { 109 throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions); 110 } 111 } 112 113 public Properties getProducerProperties() { 114 return producerProperties; 115 } 116 117 public Properties getConsumerProperties() { 118 return consumerProperties; 119 } 120 121 public Properties getAdminProperties() { 122 return adminProperties; 123 } 124 125 @Override 126 public void close() { 127 super.close(); 128 if (kUtils != null) { 129 kUtils.close(); 130 } 131 } 132 133 @Override 134 public boolean supportSubscribe() { 135 return !disableSubscribe; 136 } 137 138 @Override 139 protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, 140 RebalanceListener listener) { 141 return KafkaLogTailer.createAndSubscribe(ns, names, group, (Properties) consumerProperties.clone(), listener); 142 } 143 144 protected Properties normalizeProducerProperties(Properties producerProperties) { 145 Properties ret; 146 if (producerProperties != null) { 147 ret = (Properties) producerProperties.clone(); 148 } else { 149 ret = new Properties(); 150 } 151 ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 152 ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer"); 153 ret.remove(DEFAULT_REPLICATION_FACTOR_PROP); 154 return ret; 155 } 156 157 protected Properties normalizeConsumerProperties(Properties consumerProperties) { 158 Properties ret; 159 if (consumerProperties != null) { 160 ret = (Properties) consumerProperties.clone(); 161 } else { 162 ret = new Properties(); 163 } 164 ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 165 "org.apache.kafka.common.serialization.StringDeserializer"); 166 ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 167 "org.apache.kafka.common.serialization.BytesDeserializer"); 168 ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 169 ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 170 ret.remove(DISABLE_SUBSCRIBE_PROP); 171 return ret; 172 } 173 174 protected Properties createAdminProperties(Properties producerProperties, Properties consumerProperties) { 175 Properties ret = new Properties(); 176 ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 177 producerProperties.getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 178 consumerProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))); 179 return ret; 180 } 181 182 @Override 183 public List<LogLag> getLagPerPartition(String name, String group) { 184 Properties props = (Properties) consumerProperties.clone(); 185 props.put(ConsumerConfig.GROUP_ID_CONFIG, prefix + group); 186 try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) { 187 List<TopicPartition> topicPartitions = consumer.partitionsFor(ns.getTopicName(name)) 188 .stream() 189 .map(meta -> new TopicPartition(meta.topic(), 190 meta.partition())) 191 .collect(Collectors.toList()); 192 LogLag[] ret = new LogLag[topicPartitions.size()]; 193 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); 194 for (TopicPartition topicPartition : topicPartitions) { 195 long committedOffset = 0L; 196 OffsetAndMetadata committed = consumer.committed(topicPartition); 197 if (committed != null) { 198 committedOffset = committed.offset(); 199 } 200 Long endOffset = endOffsets.get(topicPartition); 201 if (endOffset == null) { 202 endOffset = 0L; 203 } 204 ret[topicPartition.partition()] = new LogLag(committedOffset, endOffset); 205 } 206 return Arrays.asList(ret); 207 } 208 } 209 210 @Override 211 public List<String> listAll() { 212 return kUtils.listTopics().stream().filter(name -> name.startsWith(prefix)).map(ns::getLogName).collect( 213 Collectors.toList()); 214 } 215 216 @Override 217 public String toString() { 218 return "KafkaLogManager{" + "producerProperties=" + producerProperties + ", consumerProperties=" 219 + consumerProperties + ", prefix='" + prefix + '\'' + '}'; 220 } 221 222 @Override 223 public List<String> listConsumerGroups(String name) { 224 String topic = ns.getTopicName(name); 225 if (!exists(name)) { 226 throw new IllegalArgumentException("Unknown Log: " + name); 227 } 228 return kUtils.listConsumers(topic).stream().filter(group -> group.startsWith(prefix)).map(ns::getGroup).collect( 229 Collectors.toList()); 230 } 231 232}