001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.io.Externalizable;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.Properties;
027import java.util.stream.Collectors;
028
029import org.apache.kafka.clients.admin.AdminClientConfig;
030import org.apache.kafka.clients.consumer.ConsumerConfig;
031import org.apache.kafka.clients.consumer.KafkaConsumer;
032import org.apache.kafka.clients.consumer.OffsetAndMetadata;
033import org.apache.kafka.clients.producer.ProducerConfig;
034import org.apache.kafka.common.TopicPartition;
035import org.apache.kafka.common.utils.Bytes;
036import org.nuxeo.lib.stream.log.LogLag;
037import org.nuxeo.lib.stream.log.LogPartition;
038import org.nuxeo.lib.stream.log.LogTailer;
039import org.nuxeo.lib.stream.log.RebalanceListener;
040import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
041import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
042
043/**
044 * @since 9.3
045 */
046public class KafkaLogManager extends AbstractLogManager {
047    public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable";
048
049    public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor";
050
051    protected final KafkaUtils kUtils;
052
053    protected final Properties producerProperties;
054
055    protected final Properties consumerProperties;
056
057    protected final Properties adminProperties;
058
059    protected final String prefix;
060
061    protected final short defaultReplicationFactor;
062
063    protected final boolean disableSubscribe;
064
065    protected final KafkaNamespace ns;
066
067    public KafkaLogManager(String zkServers, Properties producerProperties, Properties consumerProperties) {
068        this(zkServers, null, producerProperties, consumerProperties);
069    }
070
071    public KafkaLogManager(String zkServers, String prefix, Properties producerProperties,
072            Properties consumerProperties) {
073        this.prefix = (prefix != null) ? prefix : "";
074        this.ns = new KafkaNamespace(this.prefix);
075        disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false"));
076        defaultReplicationFactor = Short.parseShort(
077                producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1"));
078        this.producerProperties = normalizeProducerProperties(producerProperties);
079        this.consumerProperties = normalizeConsumerProperties(consumerProperties);
080        this.adminProperties = createAdminProperties(producerProperties, consumerProperties);
081        this.kUtils = new KafkaUtils(zkServers, adminProperties);
082    }
083
084    @Override
085    public void create(String name, int size) {
086        kUtils.createTopic(ns.getTopicName(name), size, defaultReplicationFactor);
087    }
088
089    @Override
090    public boolean exists(String name) {
091        return kUtils.topicExists(ns.getTopicName(name));
092    }
093
094    @Override
095    public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name) {
096        return KafkaLogAppender.open(ns, name, producerProperties, consumerProperties);
097    }
098
099    @Override
100    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions,
101            String group) {
102        partitions.forEach(this::checkValidPartition);
103        return KafkaLogTailer.createAndAssign(ns, partitions, group, (Properties) consumerProperties.clone());
104    }
105
106    protected void checkValidPartition(LogPartition partition) {
107        int partitions = kUtils.getNumberOfPartitions(ns.getTopicName(partition.name()));
108        if (partition.partition() >= partitions) {
109            throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions);
110        }
111    }
112
113    public Properties getProducerProperties() {
114        return producerProperties;
115    }
116
117    public Properties getConsumerProperties() {
118        return consumerProperties;
119    }
120
121    public Properties getAdminProperties() {
122        return adminProperties;
123    }
124
125    @Override
126    public void close() {
127        super.close();
128        if (kUtils != null) {
129            kUtils.close();
130        }
131    }
132
133    @Override
134    public boolean supportSubscribe() {
135        return !disableSubscribe;
136    }
137
138    @Override
139    protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names,
140            RebalanceListener listener) {
141        return KafkaLogTailer.createAndSubscribe(ns, names, group, (Properties) consumerProperties.clone(), listener);
142    }
143
144    protected Properties normalizeProducerProperties(Properties producerProperties) {
145        Properties ret;
146        if (producerProperties != null) {
147            ret = (Properties) producerProperties.clone();
148        } else {
149            ret = new Properties();
150        }
151        ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
152        ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer");
153        ret.remove(DEFAULT_REPLICATION_FACTOR_PROP);
154        return ret;
155    }
156
157    protected Properties normalizeConsumerProperties(Properties consumerProperties) {
158        Properties ret;
159        if (consumerProperties != null) {
160            ret = (Properties) consumerProperties.clone();
161        } else {
162            ret = new Properties();
163        }
164        ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
165                "org.apache.kafka.common.serialization.StringDeserializer");
166        ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
167                "org.apache.kafka.common.serialization.BytesDeserializer");
168        ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
169        ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
170        ret.remove(DISABLE_SUBSCRIBE_PROP);
171        return ret;
172    }
173
174    protected Properties createAdminProperties(Properties producerProperties, Properties consumerProperties) {
175        Properties ret = new Properties();
176        ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
177                producerProperties.getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
178                        consumerProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)));
179        return ret;
180    }
181
182    @Override
183    public List<LogLag> getLagPerPartition(String name, String group) {
184        Properties props = (Properties) consumerProperties.clone();
185        props.put(ConsumerConfig.GROUP_ID_CONFIG, prefix + group);
186        try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) {
187            List<TopicPartition> topicPartitions = consumer.partitionsFor(ns.getTopicName(name))
188                                                           .stream()
189                                                           .map(meta -> new TopicPartition(meta.topic(),
190                                                                   meta.partition()))
191                                                           .collect(Collectors.toList());
192            LogLag[] ret = new LogLag[topicPartitions.size()];
193            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
194            for (TopicPartition topicPartition : topicPartitions) {
195                long committedOffset = 0L;
196                OffsetAndMetadata committed = consumer.committed(topicPartition);
197                if (committed != null) {
198                    committedOffset = committed.offset();
199                }
200                Long endOffset = endOffsets.get(topicPartition);
201                if (endOffset == null) {
202                    endOffset = 0L;
203                }
204                ret[topicPartition.partition()] = new LogLag(committedOffset, endOffset);
205            }
206            return Arrays.asList(ret);
207        }
208    }
209
210    @Override
211    public List<String> listAll() {
212        return kUtils.listTopics().stream().filter(name -> name.startsWith(prefix)).map(ns::getLogName).collect(
213                Collectors.toList());
214    }
215
216    @Override
217    public String toString() {
218        return "KafkaLogManager{" + "producerProperties=" + producerProperties + ", consumerProperties="
219                + consumerProperties + ", prefix='" + prefix + '\'' + '}';
220    }
221
222    @Override
223    public List<String> listConsumerGroups(String name) {
224        String topic = ns.getTopicName(name);
225        if (!exists(name)) {
226            throw new IllegalArgumentException("Unknown Log: " + name);
227        }
228        return kUtils.listConsumers(topic).stream().filter(group -> group.startsWith(prefix)).map(ns::getGroup).collect(
229                Collectors.toList());
230    }
231
232}