001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.time.Duration;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Properties;
029import java.util.Set;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033import java.util.stream.Collectors;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.apache.kafka.clients.admin.AdminClient;
038import org.apache.kafka.clients.admin.AdminClientConfig;
039import org.apache.kafka.clients.admin.ConsumerGroupListing;
040import org.apache.kafka.clients.admin.CreateTopicsResult;
041import org.apache.kafka.clients.admin.DeleteTopicsResult;
042import org.apache.kafka.clients.admin.DescribeTopicsResult;
043import org.apache.kafka.clients.admin.NewTopic;
044import org.apache.kafka.clients.admin.TopicDescription;
045import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
046import org.apache.kafka.clients.consumer.RangeAssignor;
047import org.apache.kafka.clients.consumer.RoundRobinAssignor;
048import org.apache.kafka.common.Cluster;
049import org.apache.kafka.common.PartitionInfo;
050import org.apache.kafka.common.TopicPartition;
051import org.apache.kafka.common.errors.TopicExistsException;
052import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
053import org.nuxeo.lib.stream.StreamRuntimeException;
054import org.nuxeo.lib.stream.log.LogPartition;
055import org.nuxeo.lib.stream.log.Name;
056
057/**
058 * Misc Kafka Utils
059 *
060 * @since 9.3
061 */
062public class KafkaUtils implements AutoCloseable {
063    private static final Log log = LogFactory.getLog(KafkaUtils.class);
064
065    public static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers";
066
067    public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
068
069    protected final AdminClient adminClient;
070
071    protected volatile List<String> allConsumers;
072
073    protected volatile long allConsumersTime;
074
075    protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS = 2000;
076
077    protected static final long ADMIN_CLIENT_CLOSE_TIMEOUT_S = 5;
078
079    public KafkaUtils() {
080        this(getDefaultAdminProperties());
081    }
082
083    public KafkaUtils(Properties adminProperties) {
084        this.adminClient = AdminClient.create(adminProperties);
085    }
086
087    public static Properties getDefaultAdminProperties() {
088        Properties ret = new Properties();
089        ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
090        ret.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10_000);
091        return ret;
092    }
093
094    public static String getBootstrapServers() {
095        String bootstrapServers = System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS);
096        if (bootstrapServers == null || bootstrapServers.isEmpty()) {
097            bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS;
098        }
099        return bootstrapServers;
100    }
101
102    @SuppressWarnings("TryFinallyCanBeTryWithResources")
103    public static boolean kafkaDetected() {
104        AdminClient client = AdminClient.create(getDefaultAdminProperties());
105        try {
106            client.describeCluster().nodes().get(5, TimeUnit.SECONDS);
107            return true;
108        } catch (InterruptedException e) {
109            Thread.currentThread().interrupt();
110            throw new StreamRuntimeException(e);
111        } catch (ExecutionException e) {
112            throw new StreamRuntimeException(e);
113        } catch (TimeoutException e) {
114            return false;
115        } finally {
116            // cannot use try with resource because of timeout
117            client.close(Duration.ofSeconds(1));
118        }
119    }
120
121    public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) {
122        RangeAssignor assignor = new RangeAssignor();
123        return assignments(assignor, threads, streams);
124    }
125
126    public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) {
127        RoundRobinAssignor assignor = new RoundRobinAssignor();
128        return assignments(assignor, threads, streams);
129    }
130
131    protected static List<List<LogPartition>> assignments(ConsumerPartitionAssignor assignor, int threads,
132            Map<String, Integer> streams) {
133        final List<PartitionInfo> parts = new ArrayList<>();
134        streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size)));
135        Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<>();
136        List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList());
137        for (int i = 0; i < threads; i++) {
138            subscriptions.put(String.valueOf(i), new ConsumerPartitionAssignor.Subscription(streamNames));
139        }
140        Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(),
141                Collections.emptySet());
142        Map<String, ConsumerPartitionAssignor.Assignment> assignments = assignor.assign(cluster,
143                new ConsumerPartitionAssignor.GroupSubscription(subscriptions)).groupAssignment();
144        List<List<LogPartition>> ret = new ArrayList<>(threads);
145        for (int i = 0; i < threads; i++) {
146            ret.add(assignments.get(String.valueOf(i))
147                               .partitions()
148                               .stream()
149                               .map(part -> new LogPartition(Name.ofUrn(part.topic()), part.partition()))
150                               .collect(Collectors.toList()));
151        }
152        return ret;
153    }
154
155    protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) {
156        Collection<PartitionInfo> ret = new ArrayList<>();
157        for (int i = 0; i < partitions; i++) {
158            ret.add(new PartitionInfo(topic, i, null, null, null));
159        }
160        return ret;
161    }
162
163    public void createTopicWithoutReplication(String topic, int partitions) {
164        createTopic(topic, partitions, (short) 1);
165    }
166
167    public void createTopic(String topic, int partitions, short replicationFactor) {
168        log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor);
169        CreateTopicsResult ret = adminClient.createTopics(
170                Collections.singletonList(new NewTopic(topic, partitions, replicationFactor)));
171        try {
172            ret.all().get(5, TimeUnit.MINUTES);
173        } catch (InterruptedException e) {
174            Thread.currentThread().interrupt();
175            throw new StreamRuntimeException(e);
176        } catch (ExecutionException e) {
177            if ((e.getCause() instanceof TopicExistsException)) {
178                log.info("topic: " + topic + " exists");
179            } else {
180                throw new StreamRuntimeException(e);
181            }
182        } catch (TimeoutException e) {
183            throw new StreamRuntimeException("Unable to create topics " + topic + " within the timeout", e);
184        }
185    }
186
187    public boolean topicExists(String topic) {
188        return partitions(topic) > 0;
189    }
190
191    public int partitions(String topic) {
192        try {
193            TopicDescription desc = adminClient.describeTopics(Collections.singletonList(topic))
194                                               .values()
195                                               .get(topic)
196                                               .get();
197            if (log.isDebugEnabled()) {
198                log.debug(String.format("Topic %s exists: %s", topic, desc));
199            }
200            return desc.partitions().size();
201        } catch (InterruptedException e) {
202            Thread.currentThread().interrupt();
203            throw new StreamRuntimeException(e);
204        } catch (ExecutionException e) {
205            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
206                return -1;
207            }
208            throw new StreamRuntimeException(e);
209        }
210    }
211
212    public Set<String> listTopics() {
213        try {
214            return adminClient.listTopics().names().get();
215        } catch (InterruptedException e) {
216            Thread.currentThread().interrupt();
217            throw new StreamRuntimeException(e);
218        } catch (ExecutionException e) {
219            throw new StreamRuntimeException(e);
220        }
221    }
222
223    public List<String> listConsumers(String topic) {
224        return listAllConsumers().stream()
225                                 .filter(consumer -> getConsumerTopics(consumer).contains(topic))
226                                 .collect(Collectors.toList());
227    }
228
229    protected List<String> getConsumerTopics(String group) {
230        try {
231            return adminClient.listConsumerGroupOffsets(group)
232                              .partitionsToOffsetAndMetadata()
233                              .get()
234                              .keySet()
235                              .stream()
236                              .map(TopicPartition::topic)
237                              .collect(Collectors.toList());
238        } catch (InterruptedException e) {
239            Thread.currentThread().interrupt();
240            throw new StreamRuntimeException(e);
241        } catch (ExecutionException e) {
242            throw new StreamRuntimeException(e);
243        }
244    }
245
246    public synchronized List<String> listAllConsumers() {
247        long now = System.currentTimeMillis();
248        if (allConsumers == null || (now - allConsumersTime) > ALL_CONSUMERS_CACHE_TIMEOUT_MS) {
249            try {
250                allConsumers = adminClient.listConsumerGroups()
251                                          .all()
252                                          .get()
253                                          .stream()
254                                          .map(ConsumerGroupListing::groupId)
255                                          .collect(Collectors.toList());
256            } catch (InterruptedException e) {
257                Thread.currentThread().interrupt();
258                throw new StreamRuntimeException(e);
259            } catch (ExecutionException e) {
260                throw new StreamRuntimeException(e);
261            }
262            if (!allConsumers.isEmpty()) {
263                allConsumersTime = now;
264            }
265        }
266        return allConsumers;
267    }
268
269    public int getNumberOfPartitions(String topic) {
270        DescribeTopicsResult descriptions = adminClient.describeTopics(Collections.singletonList(topic));
271        try {
272            return descriptions.values().get(topic).get().partitions().size();
273        } catch (InterruptedException e) {
274            Thread.currentThread().interrupt();
275            throw new StreamRuntimeException(e);
276        } catch (ExecutionException e) {
277            throw new StreamRuntimeException(e);
278        }
279    }
280
281    @Override
282    public void close() {
283        adminClient.close(Duration.ofSeconds(ADMIN_CLIENT_CLOSE_TIMEOUT_S));
284        log.debug("Closed.");
285    }
286
287    public boolean delete(String topic) {
288        log.info("Deleting topic: " + topic);
289        DeleteTopicsResult result = adminClient.deleteTopics(Collections.singleton(topic));
290        return result.values().get(topic).isDone();
291    }
292}