001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Properties;
028import java.util.Set;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.TimeoutException;
032import java.util.stream.Collectors;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.apache.kafka.clients.admin.AdminClientConfig;
037import org.apache.kafka.clients.admin.CreateTopicsResult;
038import org.apache.kafka.clients.admin.DescribeTopicsResult;
039import org.apache.kafka.clients.admin.NewTopic;
040import org.apache.kafka.clients.admin.TopicDescription;
041import org.apache.kafka.clients.consumer.RangeAssignor;
042import org.apache.kafka.clients.consumer.RoundRobinAssignor;
043import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
044import org.apache.kafka.common.Cluster;
045import org.apache.kafka.common.PartitionInfo;
046import org.apache.kafka.common.TopicPartition;
047import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
048import org.nuxeo.lib.stream.StreamRuntimeException;
049import org.nuxeo.lib.stream.log.LogPartition;
050
051import kafka.admin.AdminClient;
052import kafka.coordinator.group.GroupOverview;
053import scala.collection.Iterator;
054import scala.collection.JavaConverters;
055
056/**
057 * Misc Kafka Utils
058 *
059 * @since 9.3
060 */
061public class KafkaUtils implements AutoCloseable {
062    private static final Log log = LogFactory.getLog(KafkaUtils.class);
063
064    public static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers";
065
066    public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
067
068    protected final Properties adminProperties;
069
070    protected AdminClient adminClient;
071
072    protected org.apache.kafka.clients.admin.AdminClient newAdminClient;
073
074    protected List<String> allConsumers;
075
076    protected long allConsumersTime;
077
078    protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS = 2000;
079
080    public KafkaUtils() {
081        this(getDefaultAdminProperties());
082    }
083
084    public KafkaUtils(Properties adminProperties) {
085        this.adminProperties = adminProperties;
086    }
087
088    public static Properties getDefaultAdminProperties() {
089        Properties ret = new Properties();
090        ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
091        return ret;
092    }
093
094    public static String getBootstrapServers() {
095        return System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS);
096    }
097
098    public static boolean kafkaDetected() {
099        AdminClient client = AdminClient.create(getDefaultAdminProperties());
100        try {
101            client.findAllBrokers();
102            return true;
103        } catch (RuntimeException e) {
104            return false;
105        } finally {
106            client.close();
107        }
108    }
109
110    public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) {
111        PartitionAssignor assignor = new RangeAssignor();
112        return assignments(assignor, threads, streams);
113    }
114
115    public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) {
116        PartitionAssignor assignor = new RoundRobinAssignor();
117        return assignments(assignor, threads, streams);
118    }
119
120    protected static List<List<LogPartition>> assignments(PartitionAssignor assignor, int threads,
121            Map<String, Integer> streams) {
122        final List<PartitionInfo> parts = new ArrayList<>();
123        streams.forEach((streamName, size) -> parts.addAll(getPartsFor(streamName, size)));
124        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
125        List<String> streamNames = streams.keySet().stream().sorted().collect(Collectors.toList());
126        for (int i = 0; i < threads; i++) {
127            subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames));
128        }
129        Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(),
130                Collections.emptySet());
131        Map<String, PartitionAssignor.Assignment> assignments = assignor.assign(cluster, subscriptions);
132        List<List<LogPartition>> ret = new ArrayList<>(threads);
133        for (int i = 0; i < threads; i++) {
134            ret.add(assignments.get(String.valueOf(i))
135                               .partitions()
136                               .stream()
137                               .map(part -> new LogPartition(part.topic(), part.partition()))
138                               .collect(Collectors.toList()));
139        }
140        return ret;
141    }
142
143    protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) {
144        Collection<PartitionInfo> ret = new ArrayList<>();
145        for (int i = 0; i < partitions; i++) {
146            ret.add(new PartitionInfo(topic, i, null, null, null));
147        }
148        return ret;
149    }
150
151    public void createTopicWithoutReplication(String topic, int partitions) {
152        createTopic(topic, partitions, (short) 1);
153    }
154
155    public void createTopic(String topic, int partitions, short replicationFactor) {
156        log.info("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor);
157        if (topicExists(topic)) {
158            throw new IllegalArgumentException("Cannot create Topic already exists: " + topic);
159        }
160        CreateTopicsResult ret = getNewAdminClient().createTopics(
161                Collections.singletonList(new NewTopic(topic, partitions, replicationFactor)));
162        try {
163            ret.all().get(5, TimeUnit.MINUTES);
164        } catch (InterruptedException e) {
165            Thread.currentThread().interrupt();
166            throw new StreamRuntimeException(e);
167        } catch (ExecutionException e) {
168            throw new StreamRuntimeException(e);
169        } catch (TimeoutException e) {
170            throw new StreamRuntimeException("Unable to create topics " + topic + " within the timeout", e);
171        }
172    }
173
174    public boolean topicExists(String topic) {
175        return partitions(topic) > 0;
176    }
177
178    public int partitions(String topic) {
179        try {
180            TopicDescription desc = getNewAdminClient().describeTopics(Collections.singletonList(topic))
181                                                       .values()
182                                                       .get(topic)
183                                                       .get();
184            if (log.isDebugEnabled()) {
185                log.debug(String.format("Topic %s exists: %s", topic, desc));
186            }
187            return desc.partitions().size();
188        } catch (InterruptedException e) {
189            Thread.currentThread().interrupt();
190            throw new StreamRuntimeException(e);
191        } catch (ExecutionException e) {
192            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
193                return -1;
194            }
195            throw new StreamRuntimeException(e);
196        }
197    }
198
199    public Set<String> listTopics() {
200        try {
201            return getNewAdminClient().listTopics().names().get();
202        } catch (InterruptedException e) {
203            Thread.currentThread().interrupt();
204            throw new StreamRuntimeException(e);
205        } catch (ExecutionException e) {
206            throw new StreamRuntimeException(e);
207        }
208    }
209
210    public List<String> listConsumers(String topic) {
211        return listAllConsumers().stream().filter(consumer -> getConsumerTopics(consumer).contains(topic)).collect(
212                Collectors.toList());
213    }
214
215    protected List<String> getConsumerTopics(String group) {
216        return JavaConverters.mapAsJavaMap(getAdminClient().listGroupOffsets(group))
217                             .keySet()
218                             .stream()
219                             .map(TopicPartition::topic)
220                             .collect(Collectors.toList());
221    }
222
223    protected org.apache.kafka.clients.admin.AdminClient getNewAdminClient() {
224        if (newAdminClient == null) {
225            newAdminClient = org.apache.kafka.clients.admin.AdminClient.create(adminProperties);
226        }
227        return newAdminClient;
228    }
229
230    protected AdminClient getAdminClient() {
231        if (adminClient == null) {
232            adminClient = AdminClient.create(adminProperties);
233        }
234        return adminClient;
235    }
236
237    public List<String> listAllConsumers() {
238        long now = System.currentTimeMillis();
239        if (allConsumers == null || (now - allConsumersTime) > ALL_CONSUMERS_CACHE_TIMEOUT_MS) {
240            allConsumers = new ArrayList<>();
241            // this returns only consumer group that use the subscribe API (known by coordinator)
242            scala.collection.immutable.List<GroupOverview> groups = getAdminClient().listAllConsumerGroupsFlattened();
243            Iterator<GroupOverview> iterator = groups.iterator();
244            GroupOverview group;
245            while (iterator.hasNext()) {
246                group = iterator.next();
247                if (group != null && !allConsumers.contains(group.groupId())) {
248                    allConsumers.add(group.groupId());
249                }
250            }
251            if (!allConsumers.isEmpty()) {
252                allConsumersTime = now;
253            }
254        }
255        return allConsumers;
256    }
257
258    /**
259     * Work only if delete.topic.enable is true which is not the default
260     */
261    public void markTopicForDeletion(String topic) {
262        log.debug("mark topic for deletion: " + topic);
263        try {
264            getNewAdminClient().deleteTopics(Collections.singleton(topic)).all().get();
265        } catch (InterruptedException e) {
266            Thread.currentThread().interrupt();
267            throw new StreamRuntimeException(e);
268        } catch (ExecutionException e) {
269            throw new StreamRuntimeException(e);
270        }
271    }
272
273    public int getNumberOfPartitions(String topic) {
274        DescribeTopicsResult descriptions = getNewAdminClient().describeTopics(Collections.singletonList(topic));
275        try {
276            return descriptions.values().get(topic).get().partitions().size();
277        } catch (InterruptedException e) {
278            Thread.currentThread().interrupt();
279            throw new StreamRuntimeException(e);
280        } catch (ExecutionException e) {
281            throw new StreamRuntimeException(e);
282        }
283    }
284
285    @Override
286    public void close() {
287        if (adminClient != null) {
288            adminClient.close();
289            adminClient = null;
290        }
291        if (newAdminClient != null) {
292            newAdminClient.close();
293            newAdminClient = null;
294        }
295        log.debug("Closed.");
296    }
297
298}