001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Properties;
029import java.util.Set;
030import java.util.concurrent.ExecutionException;
031import java.util.stream.Collectors;
032
033import org.I0Itec.zkclient.ZkClient;
034import org.I0Itec.zkclient.ZkConnection;
035import org.I0Itec.zkclient.exception.ZkTimeoutException;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.apache.kafka.clients.admin.DescribeTopicsResult;
039import org.apache.kafka.clients.admin.NewTopic;
040import org.apache.kafka.clients.consumer.RangeAssignor;
041import org.apache.kafka.clients.consumer.RoundRobinAssignor;
042import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
043import org.apache.kafka.common.Cluster;
044import org.apache.kafka.common.PartitionInfo;
045import org.apache.kafka.common.TopicPartition;
046import org.nuxeo.lib.stream.log.LogPartition;
047
048import kafka.admin.AdminClient;
049import kafka.admin.AdminUtils;
050import kafka.cluster.Broker;
051import kafka.cluster.EndPoint;
052import kafka.coordinator.group.GroupOverview;
053import kafka.utils.ZKStringSerializer$;
054import kafka.utils.ZkUtils;
055import scala.collection.Iterable;
056import scala.collection.IterableLike;
057import scala.collection.Iterator;
058import scala.collection.JavaConversions;
059import scala.collection.Seq;
060
061/**
062 * Misc Kafka Utils
063 *
064 * @since 9.3
065 */
066public class KafkaUtils implements AutoCloseable {
067    public static final String DEFAULT_ZK_SERVER = "localhost:2181";
068
069    public static final int ZK_TIMEOUT_MS = 6000;
070
071    public static final int ZK_CONNECTION_TIMEOUT_MS = 10000;
072
073    private static final Log log = LogFactory.getLog(KafkaUtils.class);
074
075    protected final ZkClient zkClient;
076
077    protected final ZkUtils zkUtils;
078
079    public KafkaUtils() {
080        this(DEFAULT_ZK_SERVER);
081    }
082
083    public KafkaUtils(String zkServers) {
084        log.debug("Init zkServers: " + zkServers);
085        this.zkClient = createZkClient(zkServers);
086        this.zkUtils = createZkUtils(zkServers, zkClient);
087    }
088
089    public static boolean kafkaDetected() {
090        return kafkaDetected(DEFAULT_ZK_SERVER);
091    }
092
093    public static boolean kafkaDetected(String zkServers) {
094        try {
095            ZkClient tmp = new ZkClient(zkServers, 1000, 1000, ZKStringSerializer$.MODULE$);
096            tmp.close();
097        } catch (ZkTimeoutException e) {
098            return false;
099        }
100        return true;
101    }
102
103    protected static ZkUtils createZkUtils(String zkServers, ZkClient zkClient) {
104        return new ZkUtils(zkClient, new ZkConnection(zkServers), false);
105    }
106
107    protected static ZkClient createZkClient(String zkServers) {
108        return new ZkClient(zkServers, ZK_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
109    }
110
111    public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) {
112        PartitionAssignor assignor = new RangeAssignor();
113        return assignments(assignor, threads, streams);
114    }
115
116    public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) {
117        PartitionAssignor assignor = new RoundRobinAssignor();
118        return assignments(assignor, threads, streams);
119    }
120
121    protected static List<List<LogPartition>> assignments(PartitionAssignor assignor, int threads,
122            Map<String, Integer> streams) {
123        final List<PartitionInfo> parts = new ArrayList<>();
124        streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size)));
125        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
126        List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList());
127        for (int i = 0; i < threads; i++) {
128            subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames));
129        }
130        Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(),
131                Collections.emptySet());
132        Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions);
133        List<List<LogPartition>> ret = new ArrayList<>(threads);
134        for (int i = 0; i < threads; i++) {
135            ret.add(assignments.get(String.valueOf(i))
136                               .partitions()
137                               .stream()
138                               .map(part -> new LogPartition(part.topic(), part.partition()))
139                               .collect(Collectors.toList()));
140        }
141        return ret;
142    }
143
144    protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) {
145        Collection<PartitionInfo> ret = new ArrayList<>();
146        for (int i = 0; i < partitions; i++) {
147            ret.add(new PartitionInfo(topic, i, null, null, null));
148        }
149        return ret;
150    }
151
152    public void createTopicWithoutReplication(Properties properties, String topic, int partitions) {
153        createTopic(properties, topic, partitions, (short) 1);
154    }
155
156    public void createTopic(Properties properties, String topic, int partitions, short replicationFactor) {
157        log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor);
158        if (AdminUtils.topicExists(zkUtils, topic)) {
159            String msg = "Cannot create Topic already exists: " + topic;
160            log.error(msg);
161            throw new IllegalArgumentException(msg);
162        }
163        try (org.apache.kafka.clients.admin.AdminClient client = org.apache.kafka.clients.admin.AdminClient.create(
164                properties)) {
165            client.createTopics(Collections.singletonList(new NewTopic(topic, partitions, replicationFactor)));
166        }
167    }
168
169    public boolean topicExists(String topic) {
170        return AdminUtils.topicExists(zkUtils, topic);
171    }
172
173    public List<String> listTopics() {
174        Seq<String> topics = zkUtils.getAllTopics();
175        return JavaConversions.seqAsJavaList(topics);
176    }
177
178    public List<String> listConsumers(Properties props, String topic) {
179        return listAllConsumers(props).stream()
180                                      .filter(consumer -> getConsumerTopics(props, consumer).contains(topic))
181                                      .collect(Collectors.toList());
182    }
183
184    protected List<String> getConsumerTopics(Properties props, String group) {
185        AdminClient client = AdminClient.create(props);
186        return JavaConversions.mapAsJavaMap(client.listGroupOffsets(group))
187                              .keySet()
188                              .stream()
189                              .map(TopicPartition::topic)
190                              .collect(Collectors.toList());
191    }
192
193    public List<String> listAllConsumers(Properties props) {
194        List<String> ret = new ArrayList<>();
195        AdminClient client = AdminClient.create(props);
196        // this returns only consumer group that use the subscribe API (known by coordinator)
197        scala.collection.immutable.List<GroupOverview> groups = client.listAllConsumerGroupsFlattened();
198        Iterator<GroupOverview> iter = groups.iterator();
199        GroupOverview group;
200        while (iter.hasNext()) {
201            group = iter.next();
202            if (group != null) {
203                ret.add(group.groupId());
204            }
205        }
206        return ret;
207    }
208
209    /**
210     * Work only if delete.topic.enable is true which is not the default
211     */
212    public void markTopicForDeletion(String topic) {
213        log.debug("mark topic for deletion: " + topic);
214        AdminUtils.deleteTopic(zkUtils, topic);
215    }
216
217    public int getNumberOfPartitions(Properties properties, String topic) {
218        try (org.apache.kafka.clients.admin.AdminClient client = org.apache.kafka.clients.admin.AdminClient.create(
219                properties)) {
220            DescribeTopicsResult descriptions = client.describeTopics(Collections.singletonList(topic));
221            try {
222                return descriptions.values().get(topic).get().partitions().size();
223            } catch (InterruptedException e) {
224                Thread.currentThread().interrupt();
225                throw new RuntimeException(e);
226            } catch (ExecutionException e) {
227                throw new RuntimeException(e);
228            }
229        }
230    }
231
232    public void resetConsumerStates(String topic) {
233        log.debug("Resetting consumer states");
234        AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic);
235    }
236
237    public Set<String> getBrokerEndPoints() {
238        Set<String> ret = new HashSet<>();
239        // don't use "Seq<Broker> brokers" as it causes compilation issues with Eclipse
240        // when calling brokers.iterator()
241        // (The method iterator() is ambiguous for the type Seq<Broker>)
242        IterableLike<Broker, Iterable<Broker>> brokers = zkUtils.getAllBrokersInCluster();
243        Broker broker;
244        Iterator<Broker> iter = brokers.iterator();
245        while (iter.hasNext()) {
246            broker = iter.next();
247            if (broker != null) {
248                // don't use "Seq<EndPoint> endPoints" as it causes compilation issues with Eclipse
249                // when calling endPoints.iterator()
250                // (The method iterator() is ambiguous for the type Seq<EndPoint>)
251                IterableLike<EndPoint, Iterable<EndPoint>> endPoints = broker.endPoints();
252                Iterator<EndPoint> iter2 = endPoints.iterator();
253                while (iter2.hasNext()) {
254                    EndPoint endPoint = iter2.next();
255                    ret.add(endPoint.connectionString());
256                }
257            }
258        }
259        return ret;
260    }
261
262    public String getDefaultBootstrapServers() {
263        return getBrokerEndPoints().stream().collect(Collectors.joining(","));
264    }
265
266    @Override
267    public void close() {
268        if (zkUtils != null) {
269            zkUtils.close();
270        }
271        if (zkClient != null) {
272            zkClient.close();
273        }
274        log.debug("Closed.");
275    }
276
277}