001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Properties;
029import java.util.Set;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033import java.util.stream.Collectors;
034
035import org.I0Itec.zkclient.ZkClient;
036import org.I0Itec.zkclient.ZkConnection;
037import org.I0Itec.zkclient.exception.ZkTimeoutException;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.apache.kafka.clients.admin.AdminClientConfig;
041import org.apache.kafka.clients.admin.CreateTopicsResult;
042import org.apache.kafka.clients.admin.DescribeTopicsResult;
043import org.apache.kafka.clients.admin.NewTopic;
044import org.apache.kafka.clients.consumer.RangeAssignor;
045import org.apache.kafka.clients.consumer.RoundRobinAssignor;
046import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
047import org.apache.kafka.common.Cluster;
048import org.apache.kafka.common.PartitionInfo;
049import org.apache.kafka.common.TopicPartition;
050import org.nuxeo.lib.stream.log.LogPartition;
051
052import kafka.admin.AdminClient;
053import kafka.admin.AdminUtils;
054import kafka.cluster.Broker;
055import kafka.cluster.EndPoint;
056import kafka.coordinator.group.GroupOverview;
057import kafka.utils.ZKStringSerializer$;
058import kafka.utils.ZkUtils;
059import scala.collection.Iterable;
060import scala.collection.IterableLike;
061import scala.collection.Iterator;
062import scala.collection.JavaConversions;
063import scala.collection.Seq;
064
065/**
066 * Misc Kafka Utils
067 *
068 * @since 9.3
069 */
070public class KafkaUtils implements AutoCloseable {
071    private static final Log log = LogFactory.getLog(KafkaUtils.class);
072
073    public static final String ZK_SERVERS_PROP = "kafka.zkServers";
074
075    public static final String DEFAULT_ZK_SERVERS = "localhost:2181";
076
077    public static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers";
078
079    public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
080
081    public static final int ZK_TIMEOUT_MS = 6000;
082
083    public static final int ZK_CONNECTION_TIMEOUT_MS = 10000;
084
085    protected final ZkClient zkClient;
086
087    protected final ZkUtils zkUtils;
088
089    protected final Properties adminProperties;
090
091    protected AdminClient adminClient;
092
093    protected org.apache.kafka.clients.admin.AdminClient newAdminClient;
094
095    protected List<String> allConsumers;
096
097    protected long allConsumersTime;
098
099    protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS = 2000;
100
101    public KafkaUtils() {
102        this(getZkServers(), getDefaultAdminProperties());
103    }
104
105    public KafkaUtils(String zkServers, Properties adminProperties) {
106        log.debug("Init zkServers: " + zkServers);
107        this.zkClient = createZkClient(zkServers);
108        this.zkUtils = createZkUtils(zkServers, zkClient);
109        this.adminProperties = adminProperties;
110    }
111
112    public static Properties getDefaultAdminProperties() {
113        Properties ret = new Properties();
114        ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
115        return ret;
116    }
117
118    public static String getZkServers() {
119        return System.getProperty(ZK_SERVERS_PROP, DEFAULT_ZK_SERVERS);
120    }
121
122    public static String getBootstrapServers() {
123        return System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS);
124    }
125
126    public static boolean kafkaDetected() {
127        return kafkaDetected(getZkServers());
128    }
129
130    public static boolean kafkaDetected(String zkServers) {
131        try {
132            ZkClient tmp = new ZkClient(zkServers, 1000, 1000, ZKStringSerializer$.MODULE$);
133            tmp.close();
134        } catch (ZkTimeoutException e) {
135            return false;
136        }
137        return true;
138    }
139
140    protected static ZkUtils createZkUtils(String zkServers, ZkClient zkClient) {
141        return new ZkUtils(zkClient, new ZkConnection(zkServers), false);
142    }
143
144    protected static ZkClient createZkClient(String zkServers) {
145        return new ZkClient(zkServers, ZK_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
146    }
147
148    public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) {
149        PartitionAssignor assignor = new RangeAssignor();
150        return assignments(assignor, threads, streams);
151    }
152
153    public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) {
154        PartitionAssignor assignor = new RoundRobinAssignor();
155        return assignments(assignor, threads, streams);
156    }
157
158    protected static List<List<LogPartition>> assignments(PartitionAssignor assignor, int threads,
159            Map<String, Integer> streams) {
160        final List<PartitionInfo> parts = new ArrayList<>();
161        streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size)));
162        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
163        List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList());
164        for (int i = 0; i < threads; i++) {
165            subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames));
166        }
167        Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(),
168                Collections.emptySet());
169        Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions);
170        List<List<LogPartition>> ret = new ArrayList<>(threads);
171        for (int i = 0; i < threads; i++) {
172            ret.add(assignments.get(String.valueOf(i))
173                               .partitions()
174                               .stream()
175                               .map(part -> new LogPartition(part.topic(), part.partition()))
176                               .collect(Collectors.toList()));
177        }
178        return ret;
179    }
180
181    protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) {
182        Collection<PartitionInfo> ret = new ArrayList<>();
183        for (int i = 0; i < partitions; i++) {
184            ret.add(new PartitionInfo(topic, i, null, null, null));
185        }
186        return ret;
187    }
188
189    public void createTopicWithoutReplication(String topic, int partitions) {
190        createTopic(topic, partitions, (short) 1);
191    }
192
193    public void createTopic(String topic, int partitions, short replicationFactor) {
194        log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor);
195        if (topicExists(topic)) {
196            throw new IllegalArgumentException("Cannot create Topic already exists: " + topic);
197        }
198        CreateTopicsResult ret = getNewAdminClient().createTopics(
199                Collections.singletonList(new NewTopic(topic, partitions, replicationFactor)));
200        try {
201            ret.all().get(5, TimeUnit.MINUTES);
202        } catch (InterruptedException e) {
203            Thread.currentThread().interrupt();
204            throw new RuntimeException(e);
205        } catch (ExecutionException e) {
206            throw new RuntimeException(e);
207        } catch (TimeoutException e) {
208            throw new RuntimeException("Unable to create topics " + topic + " within the timeout", e);
209        }
210    }
211
212    public boolean topicExists(String topic) {
213        return AdminUtils.topicExists(zkUtils, topic);
214    }
215
216    public List<String> listTopics() {
217        Seq<String> topics = zkUtils.getAllTopics();
218        return JavaConversions.seqAsJavaList(topics);
219    }
220
221    public List<String> listConsumers(String topic) {
222        return listAllConsumers().stream().filter(consumer -> getConsumerTopics(consumer).contains(topic)).collect(
223                Collectors.toList());
224    }
225
226    protected List<String> getConsumerTopics(String group) {
227        return JavaConversions.mapAsJavaMap(getAdminClient().listGroupOffsets(group))
228                              .keySet()
229                              .stream()
230                              .map(TopicPartition::topic)
231                              .collect(Collectors.toList());
232    }
233
234    protected org.apache.kafka.clients.admin.AdminClient getNewAdminClient() {
235        if (newAdminClient == null) {
236            newAdminClient = org.apache.kafka.clients.admin.AdminClient.create(adminProperties);
237        }
238        return newAdminClient;
239    }
240
241    protected AdminClient getAdminClient() {
242        if (adminClient == null) {
243            adminClient = AdminClient.create(adminProperties);
244        }
245        return adminClient;
246    }
247
248    public List<String> listAllConsumers() {
249        long now = System.currentTimeMillis();
250        if (allConsumers == null || (now - allConsumersTime) > ALL_CONSUMERS_CACHE_TIMEOUT_MS) {
251            allConsumers = new ArrayList<>();
252            // this returns only consumer group that use the subscribe API (known by coordinator)
253            scala.collection.immutable.List<GroupOverview> groups = getAdminClient().listAllConsumerGroupsFlattened();
254            Iterator<GroupOverview> iterator = groups.iterator();
255            GroupOverview group;
256            while (iterator.hasNext()) {
257                group = iterator.next();
258                if (group != null && !allConsumers.contains(group.groupId())) {
259                    allConsumers.add(group.groupId());
260                }
261            }
262            if (!allConsumers.isEmpty()) {
263                allConsumersTime = now;
264            }
265        }
266        return allConsumers;
267    }
268
269    /**
270     * Work only if delete.topic.enable is true which is not the default
271     */
272    public void markTopicForDeletion(String topic) {
273        log.debug("mark topic for deletion: " + topic);
274        AdminUtils.deleteTopic(zkUtils, topic);
275    }
276
277    public int getNumberOfPartitions(String topic) {
278        DescribeTopicsResult descriptions = getNewAdminClient().describeTopics(Collections.singletonList(topic));
279        try {
280            return descriptions.values().get(topic).get().partitions().size();
281        } catch (InterruptedException e) {
282            Thread.currentThread().interrupt();
283            throw new RuntimeException(e);
284        } catch (ExecutionException e) {
285            throw new RuntimeException(e);
286        }
287    }
288
289    public void resetConsumerStates(String topic) {
290        log.debug("Resetting consumer states");
291        AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic);
292    }
293
294    public Set<String> getBrokerEndPoints() {
295        Set<String> ret = new HashSet<>();
296        // don't use "Seq<Broker> brokers" as it causes compilation issues with Eclipse
297        // when calling brokers.iterator()
298        // (The method iterator() is ambiguous for the type Seq<Broker>)
299        IterableLike<Broker, Iterable<Broker>> brokers = zkUtils.getAllBrokersInCluster();
300        Broker broker;
301        Iterator<Broker> iterator = brokers.iterator();
302        while (iterator.hasNext()) {
303            broker = iterator.next();
304            if (broker != null) {
305                // don't use "Seq<EndPoint> endPoints" as it causes compilation issues with Eclipse
306                // when calling endPoints.iterator()
307                // (The method iterator() is ambiguous for the type Seq<EndPoint>)
308                IterableLike<EndPoint, Iterable<EndPoint>> endPoints = broker.endPoints();
309                Iterator<EndPoint> iterator2 = endPoints.iterator();
310                while (iterator2.hasNext()) {
311                    EndPoint endPoint = iterator2.next();
312                    ret.add(endPoint.connectionString());
313                }
314            }
315        }
316        return ret;
317    }
318
319    public String getDefaultBootstrapServers() {
320        return getBrokerEndPoints().stream().collect(Collectors.joining(","));
321    }
322
323    @Override
324    public void close() {
325        if (zkUtils != null) {
326            zkUtils.close();
327        }
328        if (zkClient != null) {
329            zkClient.close();
330        }
331        if (adminClient != null) {
332            adminClient.close();
333            adminClient = null;
334        }
335        if (newAdminClient != null) {
336            newAdminClient.close();
337            newAdminClient = null;
338        }
339        log.debug("Closed.");
340    }
341
342}