001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Properties; 029import java.util.Set; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.TimeoutException; 033import java.util.stream.Collectors; 034 035import org.I0Itec.zkclient.ZkClient; 036import org.I0Itec.zkclient.ZkConnection; 037import org.I0Itec.zkclient.exception.ZkTimeoutException; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.apache.kafka.clients.admin.AdminClientConfig; 041import org.apache.kafka.clients.admin.CreateTopicsResult; 042import org.apache.kafka.clients.admin.DescribeTopicsResult; 043import org.apache.kafka.clients.admin.NewTopic; 044import org.apache.kafka.clients.consumer.RangeAssignor; 045import org.apache.kafka.clients.consumer.RoundRobinAssignor; 046import org.apache.kafka.clients.consumer.internals.PartitionAssignor; 047import org.apache.kafka.common.Cluster; 048import org.apache.kafka.common.PartitionInfo; 049import org.apache.kafka.common.TopicPartition; 050import org.nuxeo.lib.stream.log.LogPartition; 051 052import kafka.admin.AdminClient; 053import kafka.admin.AdminUtils; 054import kafka.cluster.Broker; 055import kafka.cluster.EndPoint; 056import kafka.coordinator.group.GroupOverview; 057import kafka.utils.ZKStringSerializer$; 058import kafka.utils.ZkUtils; 059import scala.collection.Iterable; 060import scala.collection.IterableLike; 061import scala.collection.Iterator; 062import scala.collection.JavaConversions; 063import scala.collection.Seq; 064 065/** 066 * Misc Kafka Utils 067 * 068 * @since 9.3 069 */ 070public class KafkaUtils implements AutoCloseable { 071 private static final Log log = LogFactory.getLog(KafkaUtils.class); 072 073 public static final String ZK_SERVERS_PROP = "kafka.zkServers"; 074 075 public static final String DEFAULT_ZK_SERVERS = "localhost:2181"; 076 077 public static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers"; 078 079 public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; 080 081 public static final int ZK_TIMEOUT_MS = 6000; 082 083 public static final int ZK_CONNECTION_TIMEOUT_MS = 10000; 084 085 protected final ZkClient zkClient; 086 087 protected final ZkUtils zkUtils; 088 089 protected final Properties adminProperties; 090 091 protected AdminClient adminClient; 092 093 protected org.apache.kafka.clients.admin.AdminClient newAdminClient; 094 095 protected List<String> allConsumers; 096 097 protected long allConsumersTime; 098 099 protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS = 2000; 100 101 public KafkaUtils() { 102 this(getZkServers(), getDefaultAdminProperties()); 103 } 104 105 public KafkaUtils(String zkServers, Properties adminProperties) { 106 log.debug("Init zkServers: " + zkServers); 107 this.zkClient = createZkClient(zkServers); 108 this.zkUtils = createZkUtils(zkServers, zkClient); 109 this.adminProperties = adminProperties; 110 } 111 112 public static Properties getDefaultAdminProperties() { 113 Properties ret = new Properties(); 114 ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); 115 return ret; 116 } 117 118 public static String getZkServers() { 119 return System.getProperty(ZK_SERVERS_PROP, DEFAULT_ZK_SERVERS); 120 } 121 122 public static String getBootstrapServers() { 123 return System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS); 124 } 125 126 public static boolean kafkaDetected() { 127 return kafkaDetected(getZkServers()); 128 } 129 130 public static boolean kafkaDetected(String zkServers) { 131 try { 132 ZkClient tmp = new ZkClient(zkServers, 1000, 1000, ZKStringSerializer$.MODULE$); 133 tmp.close(); 134 } catch (ZkTimeoutException e) { 135 return false; 136 } 137 return true; 138 } 139 140 protected static ZkUtils createZkUtils(String zkServers, ZkClient zkClient) { 141 return new ZkUtils(zkClient, new ZkConnection(zkServers), false); 142 } 143 144 protected static ZkClient createZkClient(String zkServers) { 145 return new ZkClient(zkServers, ZK_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$); 146 } 147 148 public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) { 149 PartitionAssignor assignor = new RangeAssignor(); 150 return assignments(assignor, threads, streams); 151 } 152 153 public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) { 154 PartitionAssignor assignor = new RoundRobinAssignor(); 155 return assignments(assignor, threads, streams); 156 } 157 158 protected static List<List<LogPartition>> assignments(PartitionAssignor assignor, int threads, 159 Map<String, Integer> streams) { 160 final List<PartitionInfo> parts = new ArrayList<>(); 161 streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size))); 162 Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); 163 List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList()); 164 for (int i = 0; i < threads; i++) { 165 subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames)); 166 } 167 Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(), 168 Collections.emptySet()); 169 Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions); 170 List<List<LogPartition>> ret = new ArrayList<>(threads); 171 for (int i = 0; i < threads; i++) { 172 ret.add(assignments.get(String.valueOf(i)) 173 .partitions() 174 .stream() 175 .map(part -> new LogPartition(part.topic(), part.partition())) 176 .collect(Collectors.toList())); 177 } 178 return ret; 179 } 180 181 protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) { 182 Collection<PartitionInfo> ret = new ArrayList<>(); 183 for (int i = 0; i < partitions; i++) { 184 ret.add(new PartitionInfo(topic, i, null, null, null)); 185 } 186 return ret; 187 } 188 189 public void createTopicWithoutReplication(String topic, int partitions) { 190 createTopic(topic, partitions, (short) 1); 191 } 192 193 public void createTopic(String topic, int partitions, short replicationFactor) { 194 log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor); 195 if (topicExists(topic)) { 196 throw new IllegalArgumentException("Cannot create Topic already exists: " + topic); 197 } 198 CreateTopicsResult ret = getNewAdminClient().createTopics( 199 Collections.singletonList(new NewTopic(topic, partitions, replicationFactor))); 200 try { 201 ret.all().get(5, TimeUnit.MINUTES); 202 } catch (InterruptedException e) { 203 Thread.currentThread().interrupt(); 204 throw new RuntimeException(e); 205 } catch (ExecutionException e) { 206 throw new RuntimeException(e); 207 } catch (TimeoutException e) { 208 throw new RuntimeException("Unable to create topics " + topic + " within the timeout", e); 209 } 210 } 211 212 public boolean topicExists(String topic) { 213 return AdminUtils.topicExists(zkUtils, topic); 214 } 215 216 public List<String> listTopics() { 217 Seq<String> topics = zkUtils.getAllTopics(); 218 return JavaConversions.seqAsJavaList(topics); 219 } 220 221 public List<String> listConsumers(String topic) { 222 return listAllConsumers().stream().filter(consumer -> getConsumerTopics(consumer).contains(topic)).collect( 223 Collectors.toList()); 224 } 225 226 protected List<String> getConsumerTopics(String group) { 227 return JavaConversions.mapAsJavaMap(getAdminClient().listGroupOffsets(group)) 228 .keySet() 229 .stream() 230 .map(TopicPartition::topic) 231 .collect(Collectors.toList()); 232 } 233 234 protected org.apache.kafka.clients.admin.AdminClient getNewAdminClient() { 235 if (newAdminClient == null) { 236 newAdminClient = org.apache.kafka.clients.admin.AdminClient.create(adminProperties); 237 } 238 return newAdminClient; 239 } 240 241 protected AdminClient getAdminClient() { 242 if (adminClient == null) { 243 adminClient = AdminClient.create(adminProperties); 244 } 245 return adminClient; 246 } 247 248 public List<String> listAllConsumers() { 249 long now = System.currentTimeMillis(); 250 if (allConsumers == null || (now - allConsumersTime) > ALL_CONSUMERS_CACHE_TIMEOUT_MS) { 251 allConsumers = new ArrayList<>(); 252 // this returns only consumer group that use the subscribe API (known by coordinator) 253 scala.collection.immutable.List<GroupOverview> groups = getAdminClient().listAllConsumerGroupsFlattened(); 254 Iterator<GroupOverview> iterator = groups.iterator(); 255 GroupOverview group; 256 while (iterator.hasNext()) { 257 group = iterator.next(); 258 if (group != null && !allConsumers.contains(group.groupId())) { 259 allConsumers.add(group.groupId()); 260 } 261 } 262 if (!allConsumers.isEmpty()) { 263 allConsumersTime = now; 264 } 265 } 266 return allConsumers; 267 } 268 269 /** 270 * Work only if delete.topic.enable is true which is not the default 271 */ 272 public void markTopicForDeletion(String topic) { 273 log.debug("mark topic for deletion: " + topic); 274 AdminUtils.deleteTopic(zkUtils, topic); 275 } 276 277 public int getNumberOfPartitions(String topic) { 278 DescribeTopicsResult descriptions = getNewAdminClient().describeTopics(Collections.singletonList(topic)); 279 try { 280 return descriptions.values().get(topic).get().partitions().size(); 281 } catch (InterruptedException e) { 282 Thread.currentThread().interrupt(); 283 throw new RuntimeException(e); 284 } catch (ExecutionException e) { 285 throw new RuntimeException(e); 286 } 287 } 288 289 public void resetConsumerStates(String topic) { 290 log.debug("Resetting consumer states"); 291 AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic); 292 } 293 294 public Set<String> getBrokerEndPoints() { 295 Set<String> ret = new HashSet<>(); 296 // don't use "Seq<Broker> brokers" as it causes compilation issues with Eclipse 297 // when calling brokers.iterator() 298 // (The method iterator() is ambiguous for the type Seq<Broker>) 299 IterableLike<Broker, Iterable<Broker>> brokers = zkUtils.getAllBrokersInCluster(); 300 Broker broker; 301 Iterator<Broker> iterator = brokers.iterator(); 302 while (iterator.hasNext()) { 303 broker = iterator.next(); 304 if (broker != null) { 305 // don't use "Seq<EndPoint> endPoints" as it causes compilation issues with Eclipse 306 // when calling endPoints.iterator() 307 // (The method iterator() is ambiguous for the type Seq<EndPoint>) 308 IterableLike<EndPoint, Iterable<EndPoint>> endPoints = broker.endPoints(); 309 Iterator<EndPoint> iterator2 = endPoints.iterator(); 310 while (iterator2.hasNext()) { 311 EndPoint endPoint = iterator2.next(); 312 ret.add(endPoint.connectionString()); 313 } 314 } 315 } 316 return ret; 317 } 318 319 public String getDefaultBootstrapServers() { 320 return getBrokerEndPoints().stream().collect(Collectors.joining(",")); 321 } 322 323 @Override 324 public void close() { 325 if (zkUtils != null) { 326 zkUtils.close(); 327 } 328 if (zkClient != null) { 329 zkClient.close(); 330 } 331 if (adminClient != null) { 332 adminClient.close(); 333 adminClient = null; 334 } 335 if (newAdminClient != null) { 336 newAdminClient.close(); 337 newAdminClient = null; 338 } 339 log.debug("Closed."); 340 } 341 342}