001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Properties;
028import java.util.Set;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.TimeoutException;
032import java.util.stream.Collectors;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.apache.kafka.clients.admin.AdminClient;
037import org.apache.kafka.clients.admin.AdminClientConfig;
038import org.apache.kafka.clients.admin.ConsumerGroupListing;
039import org.apache.kafka.clients.admin.CreateTopicsResult;
040import org.apache.kafka.clients.admin.DescribeTopicsResult;
041import org.apache.kafka.clients.admin.NewTopic;
042import org.apache.kafka.clients.admin.TopicDescription;
043import org.apache.kafka.clients.consumer.RangeAssignor;
044import org.apache.kafka.clients.consumer.RoundRobinAssignor;
045import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
046import org.apache.kafka.common.Cluster;
047import org.apache.kafka.common.PartitionInfo;
048import org.apache.kafka.common.TopicPartition;
049import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
050import org.nuxeo.lib.stream.StreamRuntimeException;
051import org.nuxeo.lib.stream.log.LogPartition;
052
053/**
054 * Misc Kafka Utils
055 *
056 * @since 9.3
057 */
058public class KafkaUtils implements AutoCloseable {
059    private static final Log log = LogFactory.getLog(KafkaUtils.class);
060
061    public static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers";
062
063    public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
064
065    protected final AdminClient adminClient;
066
067    protected volatile List<String> allConsumers;
068
069    protected volatile long allConsumersTime;
070
071    protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS = 2000;
072
073    protected static final long ADMIN_CLIENT_CLOSE_TIMEOUT_S = 5;
074
075    public KafkaUtils() {
076        this(getDefaultAdminProperties());
077    }
078
079    public KafkaUtils(Properties adminProperties) {
080        this.adminClient = AdminClient.create(adminProperties);
081    }
082
083    public static Properties getDefaultAdminProperties() {
084        Properties ret = new Properties();
085        ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
086        return ret;
087    }
088
089    public static String getBootstrapServers() {
090        String bootstrapServers = System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS);
091        if (bootstrapServers == null || bootstrapServers.isEmpty()) {
092            bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS;
093        }
094        return bootstrapServers;
095    }
096
097    @SuppressWarnings("TryFinallyCanBeTryWithResources")
098    public static boolean kafkaDetected() {
099        AdminClient client = AdminClient.create(getDefaultAdminProperties());
100        try {
101            client.describeCluster().nodes().get(5, TimeUnit.SECONDS);
102            return true;
103        } catch (InterruptedException e) {
104            Thread.currentThread().interrupt();
105            throw new StreamRuntimeException(e);
106        } catch (ExecutionException e) {
107            throw new StreamRuntimeException(e);
108        } catch (TimeoutException e) {
109            return false;
110        } finally {
111            // cannot use try with resource because of timeout
112            client.close(0, TimeUnit.SECONDS);
113        }
114    }
115
116    public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) {
117        PartitionAssignor assignor = new RangeAssignor();
118        return assignments(assignor, threads, streams);
119    }
120
121    public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) {
122        PartitionAssignor assignor = new RoundRobinAssignor();
123        return assignments(assignor, threads, streams);
124    }
125
126    protected static List<List<LogPartition>> assignments(PartitionAssignor assignor, int threads,
127            Map<String, Integer> streams) {
128        final List<PartitionInfo> parts = new ArrayList<>();
129        streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size)));
130        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
131        List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList());
132        for (int i = 0; i < threads; i++) {
133            subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames));
134        }
135        Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(),
136                Collections.emptySet());
137        Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions);
138        List<List<LogPartition>> ret = new ArrayList<>(threads);
139        for (int i = 0; i < threads; i++) {
140            ret.add(assignments.get(String.valueOf(i))
141                               .partitions()
142                               .stream()
143                               .map(part -> new LogPartition(part.topic(), part.partition()))
144                               .collect(Collectors.toList()));
145        }
146        return ret;
147    }
148
149    protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) {
150        Collection<PartitionInfo> ret = new ArrayList<>();
151        for (int i = 0; i < partitions; i++) {
152            ret.add(new PartitionInfo(topic, i, null, null, null));
153        }
154        return ret;
155    }
156
157    public void createTopicWithoutReplication(String topic, int partitions) {
158        createTopic(topic, partitions, (short) 1);
159    }
160
161    public void createTopic(String topic, int partitions, short replicationFactor) {
162        log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor);
163        if (topicExists(topic)) {
164            throw new IllegalArgumentException("Cannot create Topic already exists: " + topic);
165        }
166        CreateTopicsResult ret = adminClient.createTopics(
167                Collections.singletonList(new NewTopic(topic, partitions, replicationFactor)));
168        try {
169            ret.all().get(5, TimeUnit.MINUTES);
170        } catch (InterruptedException e) {
171            Thread.currentThread().interrupt();
172            throw new StreamRuntimeException(e);
173        } catch (ExecutionException e) {
174            throw new StreamRuntimeException(e);
175        } catch (TimeoutException e) {
176            throw new StreamRuntimeException("Unable to create topics " + topic + " within the timeout", e);
177        }
178    }
179
180    public boolean topicExists(String topic) {
181        return partitions(topic) > 0;
182    }
183
184    public int partitions(String topic) {
185        try {
186            TopicDescription desc = adminClient.describeTopics(Collections.singletonList(topic))
187                                               .values()
188                                               .get(topic)
189                                               .get();
190            if (log.isDebugEnabled()) {
191                log.debug(String.format("Topic %s exists: %s", topic, desc));
192            }
193            return desc.partitions().size();
194        } catch (InterruptedException e) {
195            Thread.currentThread().interrupt();
196            throw new StreamRuntimeException(e);
197        } catch (ExecutionException e) {
198            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
199                return -1;
200            }
201            throw new StreamRuntimeException(e);
202        }
203    }
204
205    public Set<String> listTopics() {
206        try {
207            return adminClient.listTopics().names().get();
208        } catch (InterruptedException e) {
209            Thread.currentThread().interrupt();
210            throw new StreamRuntimeException(e);
211        } catch (ExecutionException e) {
212            throw new StreamRuntimeException(e);
213        }
214    }
215
216    public List<String> listConsumers(String topic) {
217        return listAllConsumers().stream()
218                                 .filter(consumer -> getConsumerTopics(consumer).contains(topic))
219                                 .collect(Collectors.toList());
220    }
221
222    protected List<String> getConsumerTopics(String group) {
223        try {
224            return adminClient.listConsumerGroupOffsets(group)
225                              .partitionsToOffsetAndMetadata()
226                              .get()
227                              .keySet()
228                              .stream()
229                              .map(TopicPartition::topic)
230                              .collect(Collectors.toList());
231        } catch (InterruptedException e) {
232            Thread.currentThread().interrupt();
233            throw new StreamRuntimeException(e);
234        } catch (ExecutionException e) {
235            throw new StreamRuntimeException(e);
236        }
237    }
238
239    public synchronized List<String> listAllConsumers() {
240        long now = System.currentTimeMillis();
241        if (allConsumers == null || (now - allConsumersTime) > ALL_CONSUMERS_CACHE_TIMEOUT_MS) {
242            try {
243                allConsumers = adminClient.listConsumerGroups()
244                                          .all()
245                                          .get()
246                                          .stream()
247                                          .map(ConsumerGroupListing::groupId)
248                                          .collect(Collectors.toList());
249            } catch (InterruptedException e) {
250                Thread.currentThread().interrupt();
251                throw new StreamRuntimeException(e);
252            } catch (ExecutionException e) {
253                throw new StreamRuntimeException(e);
254            }
255            if (!allConsumers.isEmpty()) {
256                allConsumersTime = now;
257            }
258        }
259        return allConsumers;
260    }
261
262    public int getNumberOfPartitions(String topic) {
263        DescribeTopicsResult descriptions = adminClient.describeTopics(Collections.singletonList(topic));
264        try {
265            return descriptions.values().get(topic).get().partitions().size();
266        } catch (InterruptedException e) {
267            Thread.currentThread().interrupt();
268            throw new StreamRuntimeException(e);
269        } catch (ExecutionException e) {
270            throw new StreamRuntimeException(e);
271        }
272    }
273
274    @Override
275    public void close() {
276        adminClient.close(ADMIN_CLIENT_CLOSE_TIMEOUT_S, TimeUnit.SECONDS);
277        log.debug("Closed.");
278    }
279
280}