001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.io.Externalizable; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Properties; 031import java.util.Set; 032import java.util.stream.Collectors; 033 034import org.apache.kafka.clients.consumer.ConsumerConfig; 035import org.apache.kafka.clients.consumer.KafkaConsumer; 036import org.apache.kafka.clients.consumer.OffsetAndMetadata; 037import org.apache.kafka.common.TopicPartition; 038import org.apache.kafka.common.utils.Bytes; 039import org.nuxeo.lib.stream.codec.Codec; 040import org.nuxeo.lib.stream.log.LogConfig; 041import org.nuxeo.lib.stream.log.LogLag; 042import org.nuxeo.lib.stream.log.LogPartition; 043import org.nuxeo.lib.stream.log.LogTailer; 044import org.nuxeo.lib.stream.log.Name; 045import org.nuxeo.lib.stream.log.RebalanceListener; 046import org.nuxeo.lib.stream.log.internals.AbstractLogManager; 047import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 048 049/** 050 * @since 9.3 051 */ 052public class KafkaLogManager extends AbstractLogManager { 053 public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable"; 054 055 protected final List<KafkaLogConfig> configs; 056 057 protected final KafkaLogConfig defaultConfig; 058 059 protected final Map<KafkaLogConfig, KafkaUtils> kUtils = new HashMap<>(); 060 061 /** 062 * @since 10.2 063 */ 064 public KafkaLogManager(String prefix, Properties producerProperties, Properties consumerProperties) { 065 this(Collections.singletonList(new KafkaLogConfig("unknown", true, Collections.emptyList(), prefix, null, 066 producerProperties, consumerProperties))); 067 } 068 069 /** 070 * @since 11.1 071 */ 072 public KafkaLogManager(List<KafkaLogConfig> kafkaConfigs) { 073 if (kafkaConfigs == null && kafkaConfigs.isEmpty()) { 074 throw new IllegalArgumentException("config required"); 075 } 076 this.configs = kafkaConfigs; 077 this.defaultConfig = findDefaultConfig(); 078 configs.forEach(config -> kUtils.put(config, new KafkaUtils(config.getAdminProperties()))); 079 } 080 081 protected KafkaLogConfig findDefaultConfig() { 082 List<KafkaLogConfig> defaultConfigs = configs.stream() 083 .filter(LogConfig::isDefault) 084 .collect(Collectors.toList()); 085 // use the last default config 086 if (defaultConfigs.isEmpty()) { 087 return configs.get(configs.size() - 1); 088 } 089 return defaultConfigs.get(defaultConfigs.size() - 1); 090 } 091 092 protected KafkaLogConfig getConfig(Name name) { 093 return configs.stream().filter(config -> config.match(name)).findFirst().orElse(defaultConfig); 094 } 095 096 protected KafkaLogConfig getConfig(Name name, Name group) { 097 return configs.stream().filter(config -> config.match(name, group)).findFirst().orElse(defaultConfig); 098 } 099 100 @Override 101 public void create(Name name, int size) { 102 KafkaLogConfig config = getConfig(name); 103 kUtils.get(config).createTopic(config.getResolver().getId(name), size, config.getReplicatorFactor()); 104 } 105 106 @Override 107 protected int getSize(Name name) { 108 KafkaLogConfig config = getConfig(name); 109 return kUtils.get(config).partitions(config.getResolver().getId(name)); 110 } 111 112 @Override 113 public boolean exists(Name name) { 114 KafkaLogConfig config = getConfig(name); 115 return kUtils.get(config).topicExists(config.getResolver().getId(name)); 116 } 117 118 @Override 119 public <M extends Externalizable> CloseableLogAppender<M> createAppender(Name name, Codec<M> codec) { 120 KafkaLogConfig config = getConfig(name); 121 return KafkaLogAppender.open(codec, config.getResolver(), name, config.getProducerProperties(), 122 config.getConsumerProperties()); 123 } 124 125 @Override 126 protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, Name group, 127 Codec<M> codec) { 128 partitions.forEach(this::checkValidPartition); 129 if (partitions.isEmpty()) { 130 return KafkaLogTailer.createAndAssign(codec, defaultConfig.getResolver(), partitions, group, 131 (Properties) defaultConfig.getConsumerProperties().clone()); 132 } 133 KafkaLogConfig config = getConfig(partitions.iterator().next().name()); 134 return KafkaLogTailer.createAndAssign(codec, config.getResolver(), partitions, group, 135 (Properties) config.getConsumerProperties().clone()); 136 } 137 138 protected void checkValidPartition(LogPartition partition) { 139 KafkaLogConfig config = getConfig(partition.name()); 140 int partitions = kUtils.get(config).getNumberOfPartitions(config.getResolver().getId(partition.name())); 141 if (partition.partition() >= partitions) { 142 throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions); 143 } 144 } 145 146 @Override 147 public void close() { 148 super.close(); 149 configs.forEach(config -> kUtils.get(config).close()); 150 } 151 152 @Override 153 public boolean supportSubscribe() { 154 return !defaultConfig.getDisableSubscribe(); 155 } 156 157 @Override 158 protected <M extends Externalizable> LogTailer<M> doSubscribe(Name group, Collection<Name> names, 159 RebalanceListener listener, Codec<M> codec) { 160 KafkaLogConfig config = getConfig(names.iterator().next(), group); 161 return KafkaLogTailer.createAndSubscribe(codec, config.getResolver(), names, group, 162 (Properties) config.getConsumerProperties().clone(), listener); 163 } 164 165 @Override 166 public List<LogLag> getLagPerPartition(Name name, Name group) { 167 KafkaLogConfig config = getConfig(name, group); 168 Properties props = (Properties) config.getConsumerProperties().clone(); 169 props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getResolver().getId(group)); 170 props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getResolver().getId(group) + "-lag"); 171 // Prevents to create multiple consumers with the same client/group ids 172 synchronized(KafkaLogManager.class) { 173 try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) { 174 Set<TopicPartition> topicPartitions = consumer.partitionsFor(config.getResolver().getId(name)) 175 .stream() 176 .map(meta -> new TopicPartition(meta.topic(), 177 meta.partition())) 178 .collect(Collectors.toSet()); 179 LogLag[] ret = new LogLag[topicPartitions.size()]; 180 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); 181 Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(topicPartitions); 182 for (TopicPartition topicPartition : topicPartitions) { 183 OffsetAndMetadata committed = committedOffsets.get(topicPartition); 184 long committedOffset = 0L; 185 if (committed != null) { 186 committedOffset = committed.offset(); 187 } 188 Long endOffset = endOffsets.get(topicPartition); 189 if (endOffset == null) { 190 endOffset = 0L; 191 } 192 ret[topicPartition.partition()] = new LogLag(committedOffset, endOffset); 193 } 194 return Arrays.asList(ret); 195 } 196 } 197 } 198 199 @Override 200 public List<Name> listAllNames() { 201 Set<String> allTopics = kUtils.get(defaultConfig).listTopics(); 202 Set<Name> names = new HashSet<>(allTopics.size()); 203 for (String topic : allTopics) { 204 for (KafkaLogConfig config : configs) { 205 if (topic.startsWith(config.getResolver().getPrefix())) { 206 names.add(config.getResolver().getName(topic)); 207 } 208 } 209 } 210 return new ArrayList<>(names); 211 } 212 213 @Override 214 public String toString() { 215 // TODO: filter displayed props 216 return "KafkaLogManager{" + "configs=" + configs + ", defaultConfig=" + defaultConfig + ", defaultResolver" 217 + defaultConfig.getResolver() 218 + '}'; 219 } 220 221 protected String filterDisplayedProperties(Properties properties) { 222 String ret = properties.toString(); 223 if (ret.indexOf("password") < 0) { 224 return ret; 225 } 226 return ret.replaceAll("password=.[^\\\"\\;\\,\\ ]*", "password=****"); 227 } 228 229 @Override 230 public List<Name> listConsumerGroups(Name name) { 231 KafkaLogConfig config = getConfig(name); 232 String topic = config.getResolver().getId(name); 233 if (!kUtils.get(config).topicExists(topic)) { 234 throw new IllegalArgumentException("Unknown Log: " + name); 235 } 236 return kUtils.get(config) 237 .listConsumers(topic) 238 .stream() 239 .filter(group -> group.startsWith(config.getResolver().getPrefix())) 240 .map(config.getResolver()::getName) 241 .collect(Collectors.toList()); 242 } 243 244 @Override 245 public boolean delete(Name name) { 246 KafkaLogConfig config = getConfig(name); 247 return kUtils.get(config).delete(config.getResolver().getId(name)); 248 } 249}