001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka; 020 021import kafka.admin.AdminUtils; 022import kafka.admin.RackAwareMode; 023import kafka.cluster.Broker; 024import kafka.cluster.EndPoint; 025import kafka.utils.ZKStringSerializer$; 026import kafka.utils.ZkUtils; 027import org.I0Itec.zkclient.ZkClient; 028import org.I0Itec.zkclient.ZkConnection; 029import org.I0Itec.zkclient.exception.ZkTimeoutException; 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.apache.kafka.clients.consumer.RangeAssignor; 033import org.apache.kafka.clients.consumer.RoundRobinAssignor; 034import org.apache.kafka.clients.consumer.internals.PartitionAssignor; 035import org.apache.kafka.common.Cluster; 036import org.apache.kafka.common.PartitionInfo; 037import org.apache.kafka.common.requests.MetadataResponse; 038import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 039import scala.collection.Iterator; 040import scala.collection.Seq; 041 042import java.time.Duration; 043import java.util.ArrayList; 044import java.util.Collection; 045import java.util.Collections; 046import java.util.HashMap; 047import java.util.HashSet; 048import java.util.List; 049import java.util.Map; 050import java.util.Properties; 051import java.util.Set; 052import java.util.stream.Collectors; 053 054 055/** 056 * Misc Kafka Utils 057 * @since 9.2 058 */ 059public class KafkaUtils implements AutoCloseable { 060 private static final Log log = LogFactory.getLog(KafkaUtils.class); 061 private final ZkClient zkClient; 062 private final ZkUtils zkUtils; 063 public static final String DEFAULT_ZK_SERVER = "localhost:2181"; 064 public static final int ZK_TIMEOUT_MS = 6000; 065 public static final int ZK_CONNECTION_TIMEOUT_MS = 10000; 066 067 public KafkaUtils() { 068 this(DEFAULT_ZK_SERVER); 069 } 070 071 public KafkaUtils(String zkServers) { 072 log.debug("Init zkServers: " + zkServers); 073 this.zkClient = createZkClient(zkServers); 074 this.zkUtils = createZkUtils(zkServers, zkClient); 075 } 076 077 public static boolean kafkaDetected() { 078 return kafkaDetected(DEFAULT_ZK_SERVER); 079 } 080 081 public static boolean kafkaDetected(String zkServers) { 082 try { 083 ZkClient tmp = new ZkClient(zkServers, 1000, 1000, ZKStringSerializer$.MODULE$); 084 tmp.close(); 085 } catch (ZkTimeoutException e) { 086 return false; 087 } 088 return true; 089 } 090 091 private static ZkUtils createZkUtils(String zkServers, ZkClient zkClient) { 092 return new ZkUtils(zkClient, new ZkConnection(zkServers), false); 093 } 094 095 private static ZkClient createZkClient(String zkServers) { 096 return new ZkClient(zkServers, ZK_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$); 097 } 098 099 public void createTopicWithoutReplication(String topic, int partitions) { 100 createTopic(topic, partitions, 1); 101 } 102 103 public void createTopic(String topic, int partitions, int replicationFactor) { 104 log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor); 105 if (AdminUtils.topicExists(zkUtils, topic)) { 106 String msg = "Can not create Topic already exists: " + topic; 107 log.error(msg); 108 throw new IllegalArgumentException(msg); 109 } 110 AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, 111 new Properties(), RackAwareMode.Disabled$.MODULE$); 112 try { 113 waitForTopicCreation(topic, Duration.ofSeconds(5)); 114 } catch (InterruptedException e) { 115 Thread.currentThread().interrupt(); 116 throw new RuntimeException(e); 117 } 118 AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic); 119 } 120 121 private boolean waitForTopicCreation(String topic, Duration timeout) throws InterruptedException { 122 // if you don't wait for a topic to be ready, this raise LEADER_NOT_AVAILABLE warning 123 // and you can expects lots of rebalancing 124 final long timeoutMs = timeout.toMillis(); 125 final long deadline = System.currentTimeMillis() + timeoutMs; 126 boolean ret = false; 127 while (!ret && System.currentTimeMillis() < deadline) { 128 ret = allPartitionsAssigned(topic); 129 Thread.sleep(100); 130 } 131 if (!ret) { 132 log.error("Topic: " + topic + " has some uninitialized partitions."); 133 } 134 return ret; 135 } 136 137 private boolean allPartitionsAssigned(String topic) { 138 if (!AdminUtils.topicExists(zkUtils, topic)) { 139 log.debug("Topic " + topic + " does not exists yet"); 140 return false; 141 } 142 MetadataResponse.TopicMetadata meta = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils); 143 if (meta.partitionMetadata().isEmpty()) { 144 log.debug("Topic " + topic + " has no partition yet"); 145 return false; 146 } 147 long errors = meta.partitionMetadata().stream().filter(p -> p.error().code() > 0).count(); 148 // System.out.println(topic + ": "+ errors); 149 if (errors != 0) { 150 log.debug("Topic " + topic + " have some uninitialized partitions"); 151 } 152 return errors == 0; 153 } 154 155 public boolean topicExists(String topic) { 156 return AdminUtils.topicExists(zkUtils, topic); 157 } 158 159 /** 160 * Work only if delete.topic.enable is true which is not the default 161 */ 162 public void markTopicForDeletion(String topic) { 163 log.debug("mark topic for deletion: " + topic); 164 AdminUtils.deleteTopic(zkUtils, topic); 165 } 166 167 public int getNumberOfPartitions(String topic) { 168 MetadataResponse.TopicMetadata metadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils); 169 return metadata.partitionMetadata().size(); 170 } 171 172 public void resetConsumerStates(String topic) { 173 log.debug("Resetting consumer states"); 174 AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic); 175 } 176 177 public Set<String> getBrokerEndPoints() { 178 Set<String> ret = new HashSet<>(); 179 Seq<Broker> brokers = zkUtils.getAllBrokersInCluster(); 180 Broker broker; 181 Iterator<Broker> iter = brokers.iterator(); 182 while (iter.hasNext()) { 183 broker = iter.next(); 184 if (broker != null) { 185 Seq<EndPoint> endPoints = broker.endPoints(); 186 Iterator<EndPoint> iter2 = endPoints.iterator(); 187 while (iter2.hasNext()) { 188 EndPoint endPoint = iter2.next(); 189 ret.add(endPoint.connectionString()); 190 } 191 } 192 } 193 return ret; 194 } 195 196 public String getDefaultBootstrapServers() { 197 return getBrokerEndPoints().stream().collect(Collectors.joining(",")); 198 } 199 200 @Override 201 public void close() throws Exception { 202 if (zkUtils != null) { 203 zkUtils.close(); 204 } 205 if (zkClient != null) { 206 zkClient.close(); 207 } 208 log.debug("Closed."); 209 } 210 211 212 public static List<List<MQPartition>> rangeAssignments(int threads, Map<String, Integer> streams) { 213 PartitionAssignor assignor = new RangeAssignor(); 214 return assignments(assignor, threads, streams); 215 } 216 217 public static List<List<MQPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) { 218 PartitionAssignor assignor = new RoundRobinAssignor(); 219 return assignments(assignor, threads, streams); 220 } 221 222 223 protected static List<List<MQPartition>> assignments(PartitionAssignor assignor, int threads, Map<String, Integer> streams) { 224 final List<PartitionInfo> parts = new ArrayList<>(); 225 streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size))); 226 Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); 227 List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList()); 228 for (int i = 0; i < threads; i++) { 229 subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames)); 230 } 231 Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, 232 Collections.emptySet(), Collections.emptySet()); 233 Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions); 234 List<List<MQPartition>> ret = new ArrayList<>(threads); 235 for (int i = 0; i < threads; i++) { 236 ret.add(assignments.get(String.valueOf(i)).partitions().stream() 237 .map(part -> new MQPartition(part.topic(), part.partition())) 238 .collect(Collectors.toList())); 239 } 240 return ret; 241 } 242 243 protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) { 244 Collection<PartitionInfo> ret = new ArrayList<>(); 245 for (int i = 0; i < partitions; i++) { 246 ret.add(new PartitionInfo(topic, i, null, null, null)); 247 } 248 return ret; 249 } 250 251}