001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Properties; 029import java.util.Set; 030import java.util.concurrent.ExecutionException; 031import java.util.stream.Collectors; 032 033import org.I0Itec.zkclient.ZkClient; 034import org.I0Itec.zkclient.ZkConnection; 035import org.I0Itec.zkclient.exception.ZkTimeoutException; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.apache.kafka.clients.admin.DescribeTopicsResult; 039import org.apache.kafka.clients.admin.NewTopic; 040import org.apache.kafka.clients.consumer.RangeAssignor; 041import org.apache.kafka.clients.consumer.RoundRobinAssignor; 042import org.apache.kafka.clients.consumer.internals.PartitionAssignor; 043import org.apache.kafka.common.Cluster; 044import org.apache.kafka.common.PartitionInfo; 045import org.apache.kafka.common.TopicPartition; 046import org.nuxeo.lib.stream.log.LogPartition; 047 048import kafka.admin.AdminClient; 049import kafka.admin.AdminUtils; 050import kafka.cluster.Broker; 051import kafka.cluster.EndPoint; 052import kafka.coordinator.group.GroupOverview; 053import kafka.utils.ZKStringSerializer$; 054import kafka.utils.ZkUtils; 055import scala.collection.Iterable; 056import scala.collection.IterableLike; 057import scala.collection.Iterator; 058import scala.collection.JavaConversions; 059import scala.collection.Seq; 060 061/** 062 * Misc Kafka Utils 063 * 064 * @since 9.3 065 */ 066public class KafkaUtils implements AutoCloseable { 067 public static final String DEFAULT_ZK_SERVER = "localhost:2181"; 068 069 public static final int ZK_TIMEOUT_MS = 6000; 070 071 public static final int ZK_CONNECTION_TIMEOUT_MS = 10000; 072 073 private static final Log log = LogFactory.getLog(KafkaUtils.class); 074 075 protected final ZkClient zkClient; 076 077 protected final ZkUtils zkUtils; 078 079 public KafkaUtils() { 080 this(DEFAULT_ZK_SERVER); 081 } 082 083 public KafkaUtils(String zkServers) { 084 log.debug("Init zkServers: " + zkServers); 085 this.zkClient = createZkClient(zkServers); 086 this.zkUtils = createZkUtils(zkServers, zkClient); 087 } 088 089 public static boolean kafkaDetected() { 090 return kafkaDetected(DEFAULT_ZK_SERVER); 091 } 092 093 public static boolean kafkaDetected(String zkServers) { 094 try { 095 ZkClient tmp = new ZkClient(zkServers, 1000, 1000, ZKStringSerializer$.MODULE$); 096 tmp.close(); 097 } catch (ZkTimeoutException e) { 098 return false; 099 } 100 return true; 101 } 102 103 protected static ZkUtils createZkUtils(String zkServers, ZkClient zkClient) { 104 return new ZkUtils(zkClient, new ZkConnection(zkServers), false); 105 } 106 107 protected static ZkClient createZkClient(String zkServers) { 108 return new ZkClient(zkServers, ZK_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$); 109 } 110 111 public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) { 112 PartitionAssignor assignor = new RangeAssignor(); 113 return assignments(assignor, threads, streams); 114 } 115 116 public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) { 117 PartitionAssignor assignor = new RoundRobinAssignor(); 118 return assignments(assignor, threads, streams); 119 } 120 121 protected static List<List<LogPartition>> assignments(PartitionAssignor assignor, int threads, 122 Map<String, Integer> streams) { 123 final List<PartitionInfo> parts = new ArrayList<>(); 124 streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size))); 125 Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); 126 List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList()); 127 for (int i = 0; i < threads; i++) { 128 subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames)); 129 } 130 Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(), 131 Collections.emptySet()); 132 Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions); 133 List<List<LogPartition>> ret = new ArrayList<>(threads); 134 for (int i = 0; i < threads; i++) { 135 ret.add(assignments.get(String.valueOf(i)) 136 .partitions() 137 .stream() 138 .map(part -> new LogPartition(part.topic(), part.partition())) 139 .collect(Collectors.toList())); 140 } 141 return ret; 142 } 143 144 protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) { 145 Collection<PartitionInfo> ret = new ArrayList<>(); 146 for (int i = 0; i < partitions; i++) { 147 ret.add(new PartitionInfo(topic, i, null, null, null)); 148 } 149 return ret; 150 } 151 152 public void createTopicWithoutReplication(Properties properties, String topic, int partitions) { 153 createTopic(properties, topic, partitions, (short) 1); 154 } 155 156 public void createTopic(Properties properties, String topic, int partitions, short replicationFactor) { 157 log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor); 158 if (AdminUtils.topicExists(zkUtils, topic)) { 159 String msg = "Cannot create Topic already exists: " + topic; 160 log.error(msg); 161 throw new IllegalArgumentException(msg); 162 } 163 try (org.apache.kafka.clients.admin.AdminClient client = org.apache.kafka.clients.admin.AdminClient.create( 164 properties)) { 165 client.createTopics(Collections.singletonList(new NewTopic(topic, partitions, replicationFactor))); 166 } 167 } 168 169 public boolean topicExists(String topic) { 170 return AdminUtils.topicExists(zkUtils, topic); 171 } 172 173 public List<String> listTopics() { 174 Seq<String> topics = zkUtils.getAllTopics(); 175 return JavaConversions.seqAsJavaList(topics); 176 } 177 178 public List<String> listConsumers(Properties props, String topic) { 179 return listAllConsumers(props).stream() 180 .filter(consumer -> getConsumerTopics(props, consumer).contains(topic)) 181 .collect(Collectors.toList()); 182 } 183 184 protected List<String> getConsumerTopics(Properties props, String group) { 185 AdminClient client = AdminClient.create(props); 186 return JavaConversions.mapAsJavaMap(client.listGroupOffsets(group)) 187 .keySet() 188 .stream() 189 .map(TopicPartition::topic) 190 .collect(Collectors.toList()); 191 } 192 193 public List<String> listAllConsumers(Properties props) { 194 List<String> ret = new ArrayList<>(); 195 AdminClient client = AdminClient.create(props); 196 // this returns only consumer group that use the subscribe API (known by coordinator) 197 scala.collection.immutable.List<GroupOverview> groups = client.listAllConsumerGroupsFlattened(); 198 Iterator<GroupOverview> iter = groups.iterator(); 199 GroupOverview group; 200 while (iter.hasNext()) { 201 group = iter.next(); 202 if (group != null) { 203 ret.add(group.groupId()); 204 } 205 } 206 return ret; 207 } 208 209 /** 210 * Work only if delete.topic.enable is true which is not the default 211 */ 212 public void markTopicForDeletion(String topic) { 213 log.debug("mark topic for deletion: " + topic); 214 AdminUtils.deleteTopic(zkUtils, topic); 215 } 216 217 public int getNumberOfPartitions(Properties properties, String topic) { 218 try (org.apache.kafka.clients.admin.AdminClient client = org.apache.kafka.clients.admin.AdminClient.create( 219 properties)) { 220 DescribeTopicsResult descriptions = client.describeTopics(Collections.singletonList(topic)); 221 try { 222 return descriptions.values().get(topic).get().partitions().size(); 223 } catch (InterruptedException e) { 224 Thread.currentThread().interrupt(); 225 throw new RuntimeException(e); 226 } catch (ExecutionException e) { 227 throw new RuntimeException(e); 228 } 229 } 230 } 231 232 public void resetConsumerStates(String topic) { 233 log.debug("Resetting consumer states"); 234 AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic); 235 } 236 237 public Set<String> getBrokerEndPoints() { 238 Set<String> ret = new HashSet<>(); 239 // don't use "Seq<Broker> brokers" as it causes compilation issues with Eclipse 240 // when calling brokers.iterator() 241 // (The method iterator() is ambiguous for the type Seq<Broker>) 242 IterableLike<Broker, Iterable<Broker>> brokers = zkUtils.getAllBrokersInCluster(); 243 Broker broker; 244 Iterator<Broker> iter = brokers.iterator(); 245 while (iter.hasNext()) { 246 broker = iter.next(); 247 if (broker != null) { 248 // don't use "Seq<EndPoint> endPoints" as it causes compilation issues with Eclipse 249 // when calling endPoints.iterator() 250 // (The method iterator() is ambiguous for the type Seq<EndPoint>) 251 IterableLike<EndPoint, Iterable<EndPoint>> endPoints = broker.endPoints(); 252 Iterator<EndPoint> iter2 = endPoints.iterator(); 253 while (iter2.hasNext()) { 254 EndPoint endPoint = iter2.next(); 255 ret.add(endPoint.connectionString()); 256 } 257 } 258 } 259 return ret; 260 } 261 262 public String getDefaultBootstrapServers() { 263 return getBrokerEndPoints().stream().collect(Collectors.joining(",")); 264 } 265 266 @Override 267 public void close() { 268 if (zkUtils != null) { 269 zkUtils.close(); 270 } 271 if (zkClient != null) { 272 zkClient.close(); 273 } 274 log.debug("Closed."); 275 } 276 277}