001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.time.Duration; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Properties; 029import java.util.Set; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.TimeoutException; 033import java.util.stream.Collectors; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.apache.kafka.clients.admin.AdminClient; 038import org.apache.kafka.clients.admin.AdminClientConfig; 039import org.apache.kafka.clients.admin.ConsumerGroupListing; 040import org.apache.kafka.clients.admin.CreateTopicsResult; 041import org.apache.kafka.clients.admin.DeleteTopicsResult; 042import org.apache.kafka.clients.admin.DescribeTopicsResult; 043import org.apache.kafka.clients.admin.NewTopic; 044import org.apache.kafka.clients.admin.TopicDescription; 045import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; 046import org.apache.kafka.clients.consumer.RangeAssignor; 047import org.apache.kafka.clients.consumer.RoundRobinAssignor; 048import org.apache.kafka.common.Cluster; 049import org.apache.kafka.common.PartitionInfo; 050import org.apache.kafka.common.TopicPartition; 051import org.apache.kafka.common.errors.TopicExistsException; 052import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; 053import org.nuxeo.lib.stream.StreamRuntimeException; 054import org.nuxeo.lib.stream.log.LogPartition; 055import org.nuxeo.lib.stream.log.Name; 056 057/** 058 * Misc Kafka Utils 059 * 060 * @since 9.3 061 */ 062public class KafkaUtils implements AutoCloseable { 063 private static final Log log = LogFactory.getLog(KafkaUtils.class); 064 065 public static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers"; 066 067 public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; 068 069 protected final AdminClient adminClient; 070 071 protected volatile List<String> allConsumers; 072 073 protected volatile long allConsumersTime; 074 075 protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS = 2000; 076 077 protected static final long ADMIN_CLIENT_CLOSE_TIMEOUT_S = 5; 078 079 public KafkaUtils() { 080 this(getDefaultAdminProperties()); 081 } 082 083 public KafkaUtils(Properties adminProperties) { 084 this.adminClient = AdminClient.create(adminProperties); 085 } 086 087 public static Properties getDefaultAdminProperties() { 088 Properties ret = new Properties(); 089 ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); 090 ret.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10_000); 091 return ret; 092 } 093 094 public static String getBootstrapServers() { 095 String bootstrapServers = System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS); 096 if (bootstrapServers == null || bootstrapServers.isEmpty()) { 097 bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; 098 } 099 return bootstrapServers; 100 } 101 102 @SuppressWarnings("TryFinallyCanBeTryWithResources") 103 public static boolean kafkaDetected() { 104 AdminClient client = AdminClient.create(getDefaultAdminProperties()); 105 try { 106 client.describeCluster().nodes().get(5, TimeUnit.SECONDS); 107 return true; 108 } catch (InterruptedException e) { 109 Thread.currentThread().interrupt(); 110 throw new StreamRuntimeException(e); 111 } catch (ExecutionException e) { 112 throw new StreamRuntimeException(e); 113 } catch (TimeoutException e) { 114 return false; 115 } finally { 116 // cannot use try with resource because of timeout 117 client.close(Duration.ofSeconds(1)); 118 } 119 } 120 121 public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) { 122 RangeAssignor assignor = new RangeAssignor(); 123 return assignments(assignor, threads, streams); 124 } 125 126 public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) { 127 RoundRobinAssignor assignor = new RoundRobinAssignor(); 128 return assignments(assignor, threads, streams); 129 } 130 131 protected static List<List<LogPartition>> assignments(ConsumerPartitionAssignor assignor, int threads, 132 Map<String, Integer> streams) { 133 final List<PartitionInfo> parts = new ArrayList<>(); 134 streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size))); 135 Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<>(); 136 List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList()); 137 for (int i = 0; i < threads; i++) { 138 subscriptions.put(String.valueOf(i), new ConsumerPartitionAssignor.Subscription(streamNames)); 139 } 140 Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(), 141 Collections.emptySet()); 142 Map<String, ConsumerPartitionAssignor.Assignment> assignments = assignor.assign(cluster, 143 new ConsumerPartitionAssignor.GroupSubscription(subscriptions)).groupAssignment(); 144 List<List<LogPartition>> ret = new ArrayList<>(threads); 145 for (int i = 0; i < threads; i++) { 146 ret.add(assignments.get(String.valueOf(i)) 147 .partitions() 148 .stream() 149 .map(part -> new LogPartition(Name.ofUrn(part.topic()), part.partition())) 150 .collect(Collectors.toList())); 151 } 152 return ret; 153 } 154 155 protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) { 156 Collection<PartitionInfo> ret = new ArrayList<>(); 157 for (int i = 0; i < partitions; i++) { 158 ret.add(new PartitionInfo(topic, i, null, null, null)); 159 } 160 return ret; 161 } 162 163 public void createTopicWithoutReplication(String topic, int partitions) { 164 createTopic(topic, partitions, (short) 1); 165 } 166 167 public void createTopic(String topic, int partitions, short replicationFactor) { 168 log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor); 169 CreateTopicsResult ret = adminClient.createTopics( 170 Collections.singletonList(new NewTopic(topic, partitions, replicationFactor))); 171 try { 172 ret.all().get(5, TimeUnit.MINUTES); 173 } catch (InterruptedException e) { 174 Thread.currentThread().interrupt(); 175 throw new StreamRuntimeException(e); 176 } catch (ExecutionException e) { 177 if ((e.getCause() instanceof TopicExistsException)) { 178 log.info("topic: " + topic + " exists"); 179 } else { 180 throw new StreamRuntimeException(e); 181 } 182 } catch (TimeoutException e) { 183 throw new StreamRuntimeException("Unable to create topics " + topic + " within the timeout", e); 184 } 185 } 186 187 public boolean topicExists(String topic) { 188 return partitions(topic) > 0; 189 } 190 191 public int partitions(String topic) { 192 try { 193 TopicDescription desc = adminClient.describeTopics(Collections.singletonList(topic)) 194 .values() 195 .get(topic) 196 .get(); 197 if (log.isDebugEnabled()) { 198 log.debug(String.format("Topic %s exists: %s", topic, desc)); 199 } 200 return desc.partitions().size(); 201 } catch (InterruptedException e) { 202 Thread.currentThread().interrupt(); 203 throw new StreamRuntimeException(e); 204 } catch (ExecutionException e) { 205 if (e.getCause() instanceof UnknownTopicOrPartitionException) { 206 return -1; 207 } 208 throw new StreamRuntimeException(e); 209 } 210 } 211 212 public Set<String> listTopics() { 213 try { 214 return adminClient.listTopics().names().get(); 215 } catch (InterruptedException e) { 216 Thread.currentThread().interrupt(); 217 throw new StreamRuntimeException(e); 218 } catch (ExecutionException e) { 219 throw new StreamRuntimeException(e); 220 } 221 } 222 223 public List<String> listConsumers(String topic) { 224 return listAllConsumers().stream() 225 .filter(consumer -> getConsumerTopics(consumer).contains(topic)) 226 .collect(Collectors.toList()); 227 } 228 229 protected List<String> getConsumerTopics(String group) { 230 try { 231 return adminClient.listConsumerGroupOffsets(group) 232 .partitionsToOffsetAndMetadata() 233 .get() 234 .keySet() 235 .stream() 236 .map(TopicPartition::topic) 237 .collect(Collectors.toList()); 238 } catch (InterruptedException e) { 239 Thread.currentThread().interrupt(); 240 throw new StreamRuntimeException(e); 241 } catch (ExecutionException e) { 242 throw new StreamRuntimeException(e); 243 } 244 } 245 246 public synchronized List<String> listAllConsumers() { 247 long now = System.currentTimeMillis(); 248 if (allConsumers == null || (now - allConsumersTime) > ALL_CONSUMERS_CACHE_TIMEOUT_MS) { 249 try { 250 allConsumers = adminClient.listConsumerGroups() 251 .all() 252 .get() 253 .stream() 254 .map(ConsumerGroupListing::groupId) 255 .collect(Collectors.toList()); 256 } catch (InterruptedException e) { 257 Thread.currentThread().interrupt(); 258 throw new StreamRuntimeException(e); 259 } catch (ExecutionException e) { 260 throw new StreamRuntimeException(e); 261 } 262 if (!allConsumers.isEmpty()) { 263 allConsumersTime = now; 264 } 265 } 266 return allConsumers; 267 } 268 269 public int getNumberOfPartitions(String topic) { 270 DescribeTopicsResult descriptions = adminClient.describeTopics(Collections.singletonList(topic)); 271 try { 272 return descriptions.values().get(topic).get().partitions().size(); 273 } catch (InterruptedException e) { 274 Thread.currentThread().interrupt(); 275 throw new StreamRuntimeException(e); 276 } catch (ExecutionException e) { 277 throw new StreamRuntimeException(e); 278 } 279 } 280 281 @Override 282 public void close() { 283 adminClient.close(Duration.ofSeconds(ADMIN_CLIENT_CLOSE_TIMEOUT_S)); 284 log.debug("Closed."); 285 } 286 287 public boolean delete(String topic) { 288 log.info("Deleting topic: " + topic); 289 DeleteTopicsResult result = adminClient.deleteTopics(Collections.singleton(topic)); 290 return result.values().get(topic).isDone(); 291 } 292}