001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka;
020
021import kafka.admin.AdminUtils;
022import kafka.admin.RackAwareMode;
023import kafka.cluster.Broker;
024import kafka.cluster.EndPoint;
025import kafka.utils.ZKStringSerializer$;
026import kafka.utils.ZkUtils;
027import org.I0Itec.zkclient.ZkClient;
028import org.I0Itec.zkclient.ZkConnection;
029import org.I0Itec.zkclient.exception.ZkTimeoutException;
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.apache.kafka.clients.consumer.RangeAssignor;
033import org.apache.kafka.clients.consumer.RoundRobinAssignor;
034import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
035import org.apache.kafka.common.Cluster;
036import org.apache.kafka.common.PartitionInfo;
037import org.apache.kafka.common.requests.MetadataResponse;
038import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
039import scala.collection.Iterator;
040import scala.collection.Seq;
041
042import java.time.Duration;
043import java.util.ArrayList;
044import java.util.Collection;
045import java.util.Collections;
046import java.util.HashMap;
047import java.util.HashSet;
048import java.util.List;
049import java.util.Map;
050import java.util.Properties;
051import java.util.Set;
052import java.util.stream.Collectors;
053
054
055/**
056 * Misc Kafka Utils
057 * @since 9.2
058 */
059public class KafkaUtils implements AutoCloseable {
060    private static final Log log = LogFactory.getLog(KafkaUtils.class);
061    private final ZkClient zkClient;
062    private final ZkUtils zkUtils;
063    public static final String DEFAULT_ZK_SERVER = "localhost:2181";
064    public static final int ZK_TIMEOUT_MS = 6000;
065    public static final int ZK_CONNECTION_TIMEOUT_MS = 10000;
066
067    public KafkaUtils() {
068        this(DEFAULT_ZK_SERVER);
069    }
070
071    public KafkaUtils(String zkServers) {
072        log.debug("Init zkServers: " + zkServers);
073        this.zkClient = createZkClient(zkServers);
074        this.zkUtils = createZkUtils(zkServers, zkClient);
075    }
076
077    public static boolean kafkaDetected() {
078        return kafkaDetected(DEFAULT_ZK_SERVER);
079    }
080
081    public static boolean kafkaDetected(String zkServers) {
082        try {
083            ZkClient tmp = new ZkClient(zkServers, 1000, 1000, ZKStringSerializer$.MODULE$);
084            tmp.close();
085        } catch (ZkTimeoutException e) {
086            return false;
087        }
088        return true;
089    }
090
091    private static ZkUtils createZkUtils(String zkServers, ZkClient zkClient) {
092        return new ZkUtils(zkClient, new ZkConnection(zkServers), false);
093    }
094
095    private static ZkClient createZkClient(String zkServers) {
096        return new ZkClient(zkServers, ZK_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
097    }
098
099    public void createTopicWithoutReplication(String topic, int partitions) {
100        createTopic(topic, partitions, 1);
101    }
102
103    public void createTopic(String topic, int partitions, int replicationFactor) {
104        log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor);
105        if (AdminUtils.topicExists(zkUtils, topic)) {
106            String msg = "Can not create Topic already exists: " + topic;
107            log.error(msg);
108            throw new IllegalArgumentException(msg);
109        }
110        AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
111                new Properties(), RackAwareMode.Disabled$.MODULE$);
112        try {
113            waitForTopicCreation(topic, Duration.ofSeconds(5));
114        } catch (InterruptedException e) {
115            Thread.currentThread().interrupt();
116            throw new RuntimeException(e);
117        }
118        AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic);
119    }
120
121    private boolean waitForTopicCreation(String topic, Duration timeout) throws InterruptedException {
122        // if you don't wait for a topic to be ready, this raise LEADER_NOT_AVAILABLE warning
123        // and you can expects lots of rebalancing
124        final long timeoutMs = timeout.toMillis();
125        final long deadline = System.currentTimeMillis() + timeoutMs;
126        boolean ret = false;
127        while (!ret && System.currentTimeMillis() < deadline) {
128            ret = allPartitionsAssigned(topic);
129            Thread.sleep(100);
130        }
131        if (!ret) {
132            log.error("Topic: " + topic + " has some uninitialized partitions.");
133        }
134        return ret;
135    }
136
137    private boolean allPartitionsAssigned(String topic) {
138        if (!AdminUtils.topicExists(zkUtils, topic)) {
139            log.debug("Topic " + topic + " does not exists yet");
140            return false;
141        }
142        MetadataResponse.TopicMetadata meta = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
143        if (meta.partitionMetadata().isEmpty()) {
144            log.debug("Topic " + topic + " has no partition yet");
145            return false;
146        }
147        long errors = meta.partitionMetadata().stream().filter(p -> p.error().code() > 0).count();
148        // System.out.println(topic + ": "+ errors);
149        if (errors != 0) {
150            log.debug("Topic " + topic + " have some uninitialized partitions");
151        }
152        return errors == 0;
153    }
154
155    public boolean topicExists(String topic) {
156        return AdminUtils.topicExists(zkUtils, topic);
157    }
158
159    /**
160     * Work only if delete.topic.enable is true which is not the default
161     */
162    public void markTopicForDeletion(String topic) {
163        log.debug("mark topic for deletion: " + topic);
164        AdminUtils.deleteTopic(zkUtils, topic);
165    }
166
167    public int getNumberOfPartitions(String topic) {
168        MetadataResponse.TopicMetadata metadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
169        return metadata.partitionMetadata().size();
170    }
171
172    public void resetConsumerStates(String topic) {
173        log.debug("Resetting consumer states");
174        AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic);
175    }
176
177    public Set<String> getBrokerEndPoints() {
178        Set<String> ret = new HashSet<>();
179        Seq<Broker> brokers = zkUtils.getAllBrokersInCluster();
180        Broker broker;
181        Iterator<Broker> iter = brokers.iterator();
182        while (iter.hasNext()) {
183            broker = iter.next();
184            if (broker != null) {
185                Seq<EndPoint> endPoints = broker.endPoints();
186                Iterator<EndPoint> iter2 = endPoints.iterator();
187                while (iter2.hasNext()) {
188                    EndPoint endPoint = iter2.next();
189                    ret.add(endPoint.connectionString());
190                }
191            }
192        }
193        return ret;
194    }
195
196    public String getDefaultBootstrapServers() {
197        return getBrokerEndPoints().stream().collect(Collectors.joining(","));
198    }
199
200    @Override
201    public void close() throws Exception {
202        if (zkUtils != null) {
203            zkUtils.close();
204        }
205        if (zkClient != null) {
206            zkClient.close();
207        }
208        log.debug("Closed.");
209    }
210
211
212    public static List<List<MQPartition>> rangeAssignments(int threads, Map<String, Integer> streams) {
213        PartitionAssignor assignor = new RangeAssignor();
214        return assignments(assignor, threads, streams);
215    }
216
217    public static List<List<MQPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) {
218        PartitionAssignor assignor = new RoundRobinAssignor();
219        return assignments(assignor, threads, streams);
220    }
221
222
223    protected static List<List<MQPartition>> assignments(PartitionAssignor assignor, int threads, Map<String, Integer> streams) {
224        final List<PartitionInfo> parts = new ArrayList<>();
225        streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size)));
226        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
227        List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList());
228        for (int i = 0; i < threads; i++) {
229            subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames));
230        }
231        Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts,
232                Collections.emptySet(), Collections.emptySet());
233        Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions);
234        List<List<MQPartition>> ret = new ArrayList<>(threads);
235        for (int i = 0; i < threads; i++) {
236            ret.add(assignments.get(String.valueOf(i)).partitions().stream()
237                    .map(part -> new MQPartition(part.topic(), part.partition()))
238                    .collect(Collectors.toList()));
239        }
240        return ret;
241    }
242
243    protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) {
244        Collection<PartitionInfo> ret = new ArrayList<>();
245        for (int i = 0; i < partitions; i++) {
246            ret.add(new PartitionInfo(topic, i, null, null, null));
247        }
248        return ret;
249    }
250
251}