001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.io.Externalizable;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.Properties;
027import java.util.stream.Collectors;
028
029import org.apache.kafka.clients.admin.AdminClientConfig;
030import org.apache.kafka.clients.consumer.ConsumerConfig;
031import org.apache.kafka.clients.consumer.KafkaConsumer;
032import org.apache.kafka.clients.consumer.OffsetAndMetadata;
033import org.apache.kafka.clients.producer.ProducerConfig;
034import org.apache.kafka.common.TopicPartition;
035import org.apache.kafka.common.utils.Bytes;
036import org.nuxeo.lib.stream.log.LogLag;
037import org.nuxeo.lib.stream.log.LogPartition;
038import org.nuxeo.lib.stream.log.LogTailer;
039import org.nuxeo.lib.stream.log.RebalanceListener;
040import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
041import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
042
043/**
044 * @since 9.3
045 */
046public class KafkaLogManager extends AbstractLogManager {
047    public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable";
048
049    public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor";
050
051    protected final KafkaUtils kUtils;
052
053    protected final Properties producerProperties;
054
055    protected final Properties consumerProperties;
056
057    protected final Properties adminProperties;
058
059    protected final String prefix;
060
061    protected final short defaultReplicationFactor;
062
063    protected final boolean disableSubscribe;
064
065    public KafkaLogManager(String zkServers, Properties producerProperties, Properties consumerProperties) {
066        this(zkServers, null, producerProperties, consumerProperties);
067    }
068
069    public KafkaLogManager(String zkServers, String topicPrefix, Properties producerProperties,
070            Properties consumerProperties) {
071        this.prefix = (topicPrefix != null) ? topicPrefix : "";
072        this.kUtils = new KafkaUtils(zkServers);
073        disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false"));
074        defaultReplicationFactor = Short.parseShort(
075                producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1"));
076        this.producerProperties = normalizeProducerProperties(producerProperties);
077        this.consumerProperties = normalizeConsumerProperties(consumerProperties);
078        this.adminProperties = createAdminProperties(producerProperties, consumerProperties);
079    }
080
081    protected String getTopicName(String name) {
082        return prefix + name;
083    }
084
085    protected String getNameFromTopic(String topic) {
086        if (!topic.startsWith(prefix)) {
087            throw new IllegalArgumentException(String.format("topic %s with invalid prefix %s", topic, prefix));
088        }
089        return topic.substring(prefix.length());
090    }
091
092    @Override
093    public void create(String name, int size) {
094        kUtils.createTopic(getAdminProperties(), getTopicName(name), size, defaultReplicationFactor);
095    }
096
097    @Override
098    public boolean exists(String name) {
099        return kUtils.topicExists(getTopicName(name));
100    }
101
102    @Override
103    public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name) {
104        return KafkaLogAppender.open(getTopicName(name), name, producerProperties, consumerProperties);
105    }
106
107    @Override
108    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions,
109            String group) {
110        partitions.forEach(this::checkValidPartition);
111        return KafkaLogTailer.createAndAssign(prefix, partitions, group, (Properties) consumerProperties.clone());
112    }
113
114    protected void checkValidPartition(LogPartition partition) {
115        int partitions = kUtils.getNumberOfPartitions(getAdminProperties(), getTopicName(partition.name()));
116        if (partition.partition() >= partitions) {
117            throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions);
118        }
119    }
120
121    public Properties getProducerProperties() {
122        return producerProperties;
123    }
124
125    public Properties getConsumerProperties() {
126        return consumerProperties;
127    }
128
129    public Properties getAdminProperties() {
130        return adminProperties;
131    }
132
133    @Override
134    public void close() {
135        super.close();
136        if (kUtils != null) {
137            kUtils.close();
138        }
139    }
140
141    @Override
142    public boolean supportSubscribe() {
143        return !disableSubscribe;
144    }
145
146    @Override
147    protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names,
148            RebalanceListener listener) {
149        return KafkaLogTailer.createAndSubscribe(prefix, names, group, (Properties) consumerProperties.clone(),
150                listener);
151    }
152
153    protected Properties normalizeProducerProperties(Properties producerProperties) {
154        Properties ret;
155        if (producerProperties != null) {
156            ret = (Properties) producerProperties.clone();
157        } else {
158            ret = new Properties();
159        }
160        ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
161        ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer");
162        ret.remove(DEFAULT_REPLICATION_FACTOR_PROP);
163        return ret;
164    }
165
166    protected Properties normalizeConsumerProperties(Properties consumerProperties) {
167        Properties ret;
168        if (consumerProperties != null) {
169            ret = (Properties) consumerProperties.clone();
170        } else {
171            ret = new Properties();
172        }
173        ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
174                "org.apache.kafka.common.serialization.StringDeserializer");
175        ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
176                "org.apache.kafka.common.serialization.BytesDeserializer");
177        ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
178        ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
179        ret.remove(DISABLE_SUBSCRIBE_PROP);
180        return ret;
181    }
182
183    protected Properties createAdminProperties(Properties producerProperties, Properties consumerProperties) {
184        Properties ret = new Properties();
185        ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
186                producerProperties.getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
187                        consumerProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)));
188        return ret;
189    }
190
191    @Override
192    public List<LogLag> getLagPerPartition(String name, String group) {
193        Properties props = (Properties) consumerProperties.clone();
194        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
195        try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) {
196            List<TopicPartition> topicPartitions = consumer.partitionsFor(getTopicName(name))
197                                                           .stream()
198                                                           .map(meta -> new TopicPartition(meta.topic(),
199                                                                   meta.partition()))
200                                                           .collect(Collectors.toList());
201            LogLag[] ret = new LogLag[topicPartitions.size()];
202            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
203            for (TopicPartition topicPartition : topicPartitions) {
204                long committedOffset = 0L;
205                OffsetAndMetadata committed = consumer.committed(topicPartition);
206                if (committed != null) {
207                    committedOffset = committed.offset();
208                }
209                Long endOffset = endOffsets.get(topicPartition);
210                if (endOffset == null) {
211                    endOffset = 0L;
212                }
213                ret[topicPartition.partition()] = new LogLag(committedOffset, endOffset);
214            }
215            return Arrays.asList(ret);
216        }
217    }
218
219    @Override
220    public List<String> listAll() {
221        return kUtils.listTopics().stream().filter(name -> name.startsWith(prefix)).map(this::getNameFromTopic).collect(
222                Collectors.toList());
223    }
224
225    @Override
226    public String toString() {
227        return "KafkaLogManager{" + "producerProperties=" + producerProperties + ", consumerProperties="
228                + consumerProperties + ", prefix='" + prefix + '\'' + '}';
229    }
230
231    @Override
232    public List<String> listConsumerGroups(String name) {
233        String topic = getTopicName(name);
234        if (!exists(name)) {
235            throw new IllegalArgumentException("Unknown Log: " + name);
236        }
237        return kUtils.listConsumers(getProducerProperties(), topic);
238    }
239
240}