001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Properties; 028import java.util.Set; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032import java.util.stream.Collectors; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.apache.kafka.clients.admin.AdminClientConfig; 037import org.apache.kafka.clients.admin.CreateTopicsResult; 038import org.apache.kafka.clients.admin.DescribeTopicsResult; 039import org.apache.kafka.clients.admin.NewTopic; 040import org.apache.kafka.clients.admin.TopicDescription; 041import org.apache.kafka.clients.consumer.RangeAssignor; 042import org.apache.kafka.clients.consumer.RoundRobinAssignor; 043import org.apache.kafka.clients.consumer.internals.PartitionAssignor; 044import org.apache.kafka.common.Cluster; 045import org.apache.kafka.common.PartitionInfo; 046import org.apache.kafka.common.TopicPartition; 047import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; 048import org.nuxeo.lib.stream.StreamRuntimeException; 049import org.nuxeo.lib.stream.log.LogPartition; 050 051import kafka.admin.AdminClient; 052import kafka.coordinator.group.GroupOverview; 053import scala.collection.Iterator; 054import scala.collection.JavaConverters; 055 056/** 057 * Misc Kafka Utils 058 * 059 * @since 9.3 060 */ 061public class KafkaUtils implements AutoCloseable { 062 private static final Log log = LogFactory.getLog(KafkaUtils.class); 063 064 public static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers"; 065 066 public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; 067 068 protected final Properties adminProperties; 069 070 protected AdminClient adminClient; 071 072 protected org.apache.kafka.clients.admin.AdminClient newAdminClient; 073 074 protected List<String> allConsumers; 075 076 protected long allConsumersTime; 077 078 protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS = 2000; 079 080 public KafkaUtils() { 081 this(getDefaultAdminProperties()); 082 } 083 084 public KafkaUtils(Properties adminProperties) { 085 this.adminProperties = adminProperties; 086 } 087 088 public static Properties getDefaultAdminProperties() { 089 Properties ret = new Properties(); 090 ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); 091 return ret; 092 } 093 094 public static String getBootstrapServers() { 095 return System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS); 096 } 097 098 public static boolean kafkaDetected() { 099 AdminClient client = AdminClient.create(getDefaultAdminProperties()); 100 try { 101 client.findAllBrokers(); 102 return true; 103 } catch (RuntimeException e) { 104 return false; 105 } finally { 106 client.close(); 107 } 108 } 109 110 public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) { 111 PartitionAssignor assignor = new RangeAssignor(); 112 return assignments(assignor, threads, streams); 113 } 114 115 public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) { 116 PartitionAssignor assignor = new RoundRobinAssignor(); 117 return assignments(assignor, threads, streams); 118 } 119 120 protected static List<List<LogPartition>> assignments(PartitionAssignor assignor, int threads, 121 Map<String, Integer> streams) { 122 final List<PartitionInfo> parts = new ArrayList<>(); 123 streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size))); 124 Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); 125 List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList()); 126 for (int i = 0; i < threads; i++) { 127 subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames)); 128 } 129 Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(), 130 Collections.emptySet()); 131 Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions); 132 List<List<LogPartition>> ret = new ArrayList<>(threads); 133 for (int i = 0; i < threads; i++) { 134 ret.add(assignments.get(String.valueOf(i)) 135 .partitions() 136 .stream() 137 .map(part -> new LogPartition(part.topic(), part.partition())) 138 .collect(Collectors.toList())); 139 } 140 return ret; 141 } 142 143 protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) { 144 Collection<PartitionInfo> ret = new ArrayList<>(); 145 for (int i = 0; i < partitions; i++) { 146 ret.add(new PartitionInfo(topic, i, null, null, null)); 147 } 148 return ret; 149 } 150 151 public void createTopicWithoutReplication(String topic, int partitions) { 152 createTopic(topic, partitions, (short) 1); 153 } 154 155 public void createTopic(String topic, int partitions, short replicationFactor) { 156 log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor); 157 if (topicExists(topic)) { 158 throw new IllegalArgumentException("Cannot create Topic already exists: " + topic); 159 } 160 CreateTopicsResult ret = getNewAdminClient().createTopics( 161 Collections.singletonList(new NewTopic(topic, partitions, replicationFactor))); 162 try { 163 ret.all().get(5, TimeUnit.MINUTES); 164 } catch (InterruptedException e) { 165 Thread.currentThread().interrupt(); 166 throw new StreamRuntimeException(e); 167 } catch (ExecutionException e) { 168 throw new StreamRuntimeException(e); 169 } catch (TimeoutException e) { 170 throw new StreamRuntimeException("Unable to create topics " + topic + " within the timeout", e); 171 } 172 } 173 174 public boolean topicExists(String topic) { 175 return partitions(topic) > 0; 176 } 177 178 public int partitions(String topic) { 179 try { 180 TopicDescription desc = getNewAdminClient().describeTopics(Collections.singletonList(topic)) 181 .values() 182 .get(topic) 183 .get(); 184 if (log.isDebugEnabled()) { 185 log.debug(String.format("Topic %s exists: %s", topic, desc)); 186 } 187 return desc.partitions().size(); 188 } catch (InterruptedException e) { 189 Thread.currentThread().interrupt(); 190 throw new StreamRuntimeException(e); 191 } catch (ExecutionException e) { 192 if (e.getCause() instanceof UnknownTopicOrPartitionException) { 193 return -1; 194 } 195 throw new StreamRuntimeException(e); 196 } 197 } 198 199 public Set<String> listTopics() { 200 try { 201 return getNewAdminClient().listTopics().names().get(); 202 } catch (InterruptedException e) { 203 Thread.currentThread().interrupt(); 204 throw new StreamRuntimeException(e); 205 } catch (ExecutionException e) { 206 throw new StreamRuntimeException(e); 207 } 208 } 209 210 public List<String> listConsumers(String topic) { 211 return listAllConsumers().stream().filter(consumer -> getConsumerTopics(consumer).contains(topic)).collect( 212 Collectors.toList()); 213 } 214 215 protected List<String> getConsumerTopics(String group) { 216 return JavaConverters.mapAsJavaMap(getAdminClient().listGroupOffsets(group)) 217 .keySet() 218 .stream() 219 .map(TopicPartition::topic) 220 .collect(Collectors.toList()); 221 } 222 223 protected org.apache.kafka.clients.admin.AdminClient getNewAdminClient() { 224 if (newAdminClient == null) { 225 newAdminClient = org.apache.kafka.clients.admin.AdminClient.create(adminProperties); 226 } 227 return newAdminClient; 228 } 229 230 protected AdminClient getAdminClient() { 231 if (adminClient == null) { 232 adminClient = AdminClient.create(adminProperties); 233 } 234 return adminClient; 235 } 236 237 public List<String> listAllConsumers() { 238 long now = System.currentTimeMillis(); 239 if (allConsumers == null || (now - allConsumersTime) > ALL_CONSUMERS_CACHE_TIMEOUT_MS) { 240 allConsumers = new ArrayList<>(); 241 // this returns only consumer group that use the subscribe API (known by coordinator) 242 scala.collection.immutable.List<GroupOverview> groups = getAdminClient().listAllConsumerGroupsFlattened(); 243 Iterator<GroupOverview> iterator = groups.iterator(); 244 GroupOverview group; 245 while (iterator.hasNext()) { 246 group = iterator.next(); 247 if (group != null && !allConsumers.contains(group.groupId())) { 248 allConsumers.add(group.groupId()); 249 } 250 } 251 if (!allConsumers.isEmpty()) { 252 allConsumersTime = now; 253 } 254 } 255 return allConsumers; 256 } 257 258 /** 259 * Work only if delete.topic.enable is true which is not the default 260 */ 261 public void markTopicForDeletion(String topic) { 262 log.debug("mark topic for deletion: " + topic); 263 try { 264 getNewAdminClient().deleteTopics(Collections.singleton(topic)).all().get(); 265 } catch (InterruptedException e) { 266 Thread.currentThread().interrupt(); 267 throw new StreamRuntimeException(e); 268 } catch (ExecutionException e) { 269 throw new StreamRuntimeException(e); 270 } 271 } 272 273 public int getNumberOfPartitions(String topic) { 274 DescribeTopicsResult descriptions = getNewAdminClient().describeTopics(Collections.singletonList(topic)); 275 try { 276 return descriptions.values().get(topic).get().partitions().size(); 277 } catch (InterruptedException e) { 278 Thread.currentThread().interrupt(); 279 throw new StreamRuntimeException(e); 280 } catch (ExecutionException e) { 281 throw new StreamRuntimeException(e); 282 } 283 } 284 285 @Override 286 public void close() { 287 if (adminClient != null) { 288 adminClient.close(); 289 adminClient = null; 290 } 291 if (newAdminClient != null) { 292 newAdminClient.close(); 293 newAdminClient = null; 294 } 295 log.debug("Closed."); 296 } 297 298}