001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Properties; 028import java.util.Set; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032import java.util.stream.Collectors; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.apache.kafka.clients.admin.AdminClient; 037import org.apache.kafka.clients.admin.AdminClientConfig; 038import org.apache.kafka.clients.admin.ConsumerGroupListing; 039import org.apache.kafka.clients.admin.CreateTopicsResult; 040import org.apache.kafka.clients.admin.DescribeTopicsResult; 041import org.apache.kafka.clients.admin.NewTopic; 042import org.apache.kafka.clients.admin.TopicDescription; 043import org.apache.kafka.clients.consumer.RangeAssignor; 044import org.apache.kafka.clients.consumer.RoundRobinAssignor; 045import org.apache.kafka.clients.consumer.internals.PartitionAssignor; 046import org.apache.kafka.common.Cluster; 047import org.apache.kafka.common.PartitionInfo; 048import org.apache.kafka.common.TopicPartition; 049import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; 050import org.nuxeo.lib.stream.StreamRuntimeException; 051import org.nuxeo.lib.stream.log.LogPartition; 052 053/** 054 * Misc Kafka Utils 055 * 056 * @since 9.3 057 */ 058public class KafkaUtils implements AutoCloseable { 059 private static final Log log = LogFactory.getLog(KafkaUtils.class); 060 061 public static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers"; 062 063 public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; 064 065 protected final AdminClient adminClient; 066 067 protected volatile List<String> allConsumers; 068 069 protected volatile long allConsumersTime; 070 071 protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS = 2000; 072 073 protected static final long ADMIN_CLIENT_CLOSE_TIMEOUT_S = 5; 074 075 public KafkaUtils() { 076 this(getDefaultAdminProperties()); 077 } 078 079 public KafkaUtils(Properties adminProperties) { 080 this.adminClient = AdminClient.create(adminProperties); 081 } 082 083 public static Properties getDefaultAdminProperties() { 084 Properties ret = new Properties(); 085 ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); 086 return ret; 087 } 088 089 public static String getBootstrapServers() { 090 String bootstrapServers = System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS); 091 if (bootstrapServers == null || bootstrapServers.isEmpty()) { 092 bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; 093 } 094 return bootstrapServers; 095 } 096 097 @SuppressWarnings("TryFinallyCanBeTryWithResources") 098 public static boolean kafkaDetected() { 099 AdminClient client = AdminClient.create(getDefaultAdminProperties()); 100 try { 101 client.describeCluster().nodes().get(5, TimeUnit.SECONDS); 102 return true; 103 } catch (InterruptedException e) { 104 Thread.currentThread().interrupt(); 105 throw new StreamRuntimeException(e); 106 } catch (ExecutionException e) { 107 throw new StreamRuntimeException(e); 108 } catch (TimeoutException e) { 109 return false; 110 } finally { 111 // cannot use try with resource because of timeout 112 client.close(0, TimeUnit.SECONDS); 113 } 114 } 115 116 public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) { 117 PartitionAssignor assignor = new RangeAssignor(); 118 return assignments(assignor, threads, streams); 119 } 120 121 public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) { 122 PartitionAssignor assignor = new RoundRobinAssignor(); 123 return assignments(assignor, threads, streams); 124 } 125 126 protected static List<List<LogPartition>> assignments(PartitionAssignor assignor, int threads, 127 Map<String, Integer> streams) { 128 final List<PartitionInfo> parts = new ArrayList<>(); 129 streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size))); 130 Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); 131 List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList()); 132 for (int i = 0; i < threads; i++) { 133 subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames)); 134 } 135 Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(), 136 Collections.emptySet()); 137 Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions); 138 List<List<LogPartition>> ret = new ArrayList<>(threads); 139 for (int i = 0; i < threads; i++) { 140 ret.add(assignments.get(String.valueOf(i)) 141 .partitions() 142 .stream() 143 .map(part -> new LogPartition(part.topic(), part.partition())) 144 .collect(Collectors.toList())); 145 } 146 return ret; 147 } 148 149 protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) { 150 Collection<PartitionInfo> ret = new ArrayList<>(); 151 for (int i = 0; i < partitions; i++) { 152 ret.add(new PartitionInfo(topic, i, null, null, null)); 153 } 154 return ret; 155 } 156 157 public void createTopicWithoutReplication(String topic, int partitions) { 158 createTopic(topic, partitions, (short) 1); 159 } 160 161 public void createTopic(String topic, int partitions, short replicationFactor) { 162 log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor); 163 if (topicExists(topic)) { 164 throw new IllegalArgumentException("Cannot create Topic already exists: " + topic); 165 } 166 CreateTopicsResult ret = adminClient.createTopics( 167 Collections.singletonList(new NewTopic(topic, partitions, replicationFactor))); 168 try { 169 ret.all().get(5, TimeUnit.MINUTES); 170 } catch (InterruptedException e) { 171 Thread.currentThread().interrupt(); 172 throw new StreamRuntimeException(e); 173 } catch (ExecutionException e) { 174 throw new StreamRuntimeException(e); 175 } catch (TimeoutException e) { 176 throw new StreamRuntimeException("Unable to create topics " + topic + " within the timeout", e); 177 } 178 } 179 180 public boolean topicExists(String topic) { 181 return partitions(topic) > 0; 182 } 183 184 public int partitions(String topic) { 185 try { 186 TopicDescription desc = adminClient.describeTopics(Collections.singletonList(topic)) 187 .values() 188 .get(topic) 189 .get(); 190 if (log.isDebugEnabled()) { 191 log.debug(String.format("Topic %s exists: %s", topic, desc)); 192 } 193 return desc.partitions().size(); 194 } catch (InterruptedException e) { 195 Thread.currentThread().interrupt(); 196 throw new StreamRuntimeException(e); 197 } catch (ExecutionException e) { 198 if (e.getCause() instanceof UnknownTopicOrPartitionException) { 199 return -1; 200 } 201 throw new StreamRuntimeException(e); 202 } 203 } 204 205 public Set<String> listTopics() { 206 try { 207 return adminClient.listTopics().names().get(); 208 } catch (InterruptedException e) { 209 Thread.currentThread().interrupt(); 210 throw new StreamRuntimeException(e); 211 } catch (ExecutionException e) { 212 throw new StreamRuntimeException(e); 213 } 214 } 215 216 public List<String> listConsumers(String topic) { 217 return listAllConsumers().stream() 218 .filter(consumer -> getConsumerTopics(consumer).contains(topic)) 219 .collect(Collectors.toList()); 220 } 221 222 protected List<String> getConsumerTopics(String group) { 223 try { 224 return adminClient.listConsumerGroupOffsets(group) 225 .partitionsToOffsetAndMetadata() 226 .get() 227 .keySet() 228 .stream() 229 .map(TopicPartition::topic) 230 .collect(Collectors.toList()); 231 } catch (InterruptedException e) { 232 Thread.currentThread().interrupt(); 233 throw new StreamRuntimeException(e); 234 } catch (ExecutionException e) { 235 throw new StreamRuntimeException(e); 236 } 237 } 238 239 public synchronized List<String> listAllConsumers() { 240 long now = System.currentTimeMillis(); 241 if (allConsumers == null || (now - allConsumersTime) > ALL_CONSUMERS_CACHE_TIMEOUT_MS) { 242 try { 243 allConsumers = adminClient.listConsumerGroups() 244 .all() 245 .get() 246 .stream() 247 .map(ConsumerGroupListing::groupId) 248 .collect(Collectors.toList()); 249 } catch (InterruptedException e) { 250 Thread.currentThread().interrupt(); 251 throw new StreamRuntimeException(e); 252 } catch (ExecutionException e) { 253 throw new StreamRuntimeException(e); 254 } 255 if (!allConsumers.isEmpty()) { 256 allConsumersTime = now; 257 } 258 } 259 return allConsumers; 260 } 261 262 public int getNumberOfPartitions(String topic) { 263 DescribeTopicsResult descriptions = adminClient.describeTopics(Collections.singletonList(topic)); 264 try { 265 return descriptions.values().get(topic).get().partitions().size(); 266 } catch (InterruptedException e) { 267 Thread.currentThread().interrupt(); 268 throw new StreamRuntimeException(e); 269 } catch (ExecutionException e) { 270 throw new StreamRuntimeException(e); 271 } 272 } 273 274 @Override 275 public void close() { 276 adminClient.close(ADMIN_CLIENT_CLOSE_TIMEOUT_S, TimeUnit.SECONDS); 277 log.debug("Closed."); 278 } 279 280}