001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.io.Externalizable;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.Properties;
027import java.util.stream.Collectors;
028
029import org.apache.kafka.clients.admin.AdminClientConfig;
030import org.apache.kafka.clients.consumer.ConsumerConfig;
031import org.apache.kafka.clients.consumer.KafkaConsumer;
032import org.apache.kafka.clients.consumer.OffsetAndMetadata;
033import org.apache.kafka.clients.producer.ProducerConfig;
034import org.apache.kafka.common.TopicPartition;
035import org.apache.kafka.common.utils.Bytes;
036import org.nuxeo.lib.stream.codec.Codec;
037import org.nuxeo.lib.stream.log.LogLag;
038import org.nuxeo.lib.stream.log.LogPartition;
039import org.nuxeo.lib.stream.log.LogTailer;
040import org.nuxeo.lib.stream.log.RebalanceListener;
041import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
042import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
043
044/**
045 * @since 9.3
046 */
047public class KafkaLogManager extends AbstractLogManager {
048    public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable";
049
050    public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor";
051
052    protected final KafkaUtils kUtils;
053
054    protected final Properties producerProperties;
055
056    protected final Properties consumerProperties;
057
058    protected final Properties adminProperties;
059
060    protected final String prefix;
061
062    protected final short defaultReplicationFactor;
063
064    protected final boolean disableSubscribe;
065
066    protected final KafkaNamespace ns;
067
068    /**
069     * @deprecated since 10.2, zookeeper is not needed anymore, you need to remove the zkServers parameter.
070     */
071    @Deprecated
072    public KafkaLogManager(String zkServers, String prefix, Properties producerProperties,
073            Properties consumerProperties) {
074        this(prefix, producerProperties, consumerProperties);
075    }
076
077    /**
078     * @since 10.2
079     */
080    public KafkaLogManager(String prefix, Properties producerProperties, Properties consumerProperties) {
081        this.prefix = (prefix != null) ? prefix : "";
082        this.ns = new KafkaNamespace(this.prefix);
083        disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false"));
084        defaultReplicationFactor = Short.parseShort(
085                producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1"));
086        this.producerProperties = normalizeProducerProperties(producerProperties);
087        this.consumerProperties = normalizeConsumerProperties(consumerProperties);
088        this.adminProperties = createAdminProperties(producerProperties, consumerProperties);
089        this.kUtils = new KafkaUtils(adminProperties);
090    }
091
092    @Override
093    public void create(String name, int size) {
094        kUtils.createTopic(ns.getTopicName(name), size, defaultReplicationFactor);
095    }
096
097    @Override
098    protected int getSize(String name) {
099        return kUtils.partitions(ns.getTopicName(name));
100    }
101
102    @Override
103    public boolean exists(String name) {
104        return kUtils.topicExists(ns.getTopicName(name));
105    }
106
107    @Override
108    public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name, Codec<M> codec) {
109        return KafkaLogAppender.open(codec, ns, name, producerProperties, consumerProperties);
110    }
111
112    @Override
113    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, String group,
114            Codec<M> codec) {
115        partitions.forEach(this::checkValidPartition);
116        return KafkaLogTailer.createAndAssign(codec, ns, partitions, group, (Properties) consumerProperties.clone());
117    }
118
119    protected void checkValidPartition(LogPartition partition) {
120        int partitions = kUtils.getNumberOfPartitions(ns.getTopicName(partition.name()));
121        if (partition.partition() >= partitions) {
122            throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions);
123        }
124    }
125
126    public Properties getProducerProperties() {
127        return producerProperties;
128    }
129
130    public Properties getConsumerProperties() {
131        return consumerProperties;
132    }
133
134    public Properties getAdminProperties() {
135        return adminProperties;
136    }
137
138    @Override
139    public void close() {
140        super.close();
141        if (kUtils != null) {
142            kUtils.close();
143        }
144    }
145
146    @Override
147    public boolean supportSubscribe() {
148        return !disableSubscribe;
149    }
150
151    @Override
152    protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names,
153            RebalanceListener listener, Codec<M> codec) {
154        return KafkaLogTailer.createAndSubscribe(codec, ns, names, group, (Properties) consumerProperties.clone(),
155                listener);
156    }
157
158    protected Properties normalizeProducerProperties(Properties producerProperties) {
159        Properties ret;
160        if (producerProperties != null) {
161            ret = (Properties) producerProperties.clone();
162        } else {
163            ret = new Properties();
164        }
165        ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
166        ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer");
167        ret.remove(DEFAULT_REPLICATION_FACTOR_PROP);
168        return ret;
169    }
170
171    protected Properties normalizeConsumerProperties(Properties consumerProperties) {
172        Properties ret;
173        if (consumerProperties != null) {
174            ret = (Properties) consumerProperties.clone();
175        } else {
176            ret = new Properties();
177        }
178        ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
179                "org.apache.kafka.common.serialization.StringDeserializer");
180        ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
181                "org.apache.kafka.common.serialization.BytesDeserializer");
182        ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
183        ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
184        ret.remove(DISABLE_SUBSCRIBE_PROP);
185        return ret;
186    }
187
188    protected Properties createAdminProperties(Properties producerProperties, Properties consumerProperties) {
189        Properties ret = new Properties();
190        ret.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
191                producerProperties.getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
192                        consumerProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)));
193        return ret;
194    }
195
196    @Override
197    public synchronized List<LogLag> getLagPerPartition(String name, String group) {
198        Properties props = (Properties) consumerProperties.clone();
199        props.put(ConsumerConfig.GROUP_ID_CONFIG, prefix + group);
200        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "lag");
201        try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) {
202            List<TopicPartition> topicPartitions = consumer.partitionsFor(ns.getTopicName(name))
203                                                           .stream()
204                                                           .map(meta -> new TopicPartition(meta.topic(),
205                                                                   meta.partition()))
206                                                           .collect(Collectors.toList());
207            LogLag[] ret = new LogLag[topicPartitions.size()];
208            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
209            for (TopicPartition topicPartition : topicPartitions) {
210                long committedOffset = 0L;
211                OffsetAndMetadata committed = consumer.committed(topicPartition);
212                if (committed != null) {
213                    committedOffset = committed.offset();
214                }
215                Long endOffset = endOffsets.get(topicPartition);
216                if (endOffset == null) {
217                    endOffset = 0L;
218                }
219                ret[topicPartition.partition()] = new LogLag(committedOffset, endOffset);
220            }
221            return Arrays.asList(ret);
222        }
223    }
224
225    @Override
226    public List<String> listAll() {
227        return kUtils.listTopics()
228                     .stream()
229                     .filter(name -> name.startsWith(prefix))
230                     .map(ns::getLogName)
231                     .collect(Collectors.toList());
232    }
233
234    @Override
235    public String toString() {
236        return "KafkaLogManager{" + "producerProperties=" + producerProperties + ", consumerProperties="
237                + consumerProperties + ", prefix='" + prefix + '\'' + '}';
238    }
239
240    @Override
241    public List<String> listConsumerGroups(String name) {
242        String topic = ns.getTopicName(name);
243        if (!exists(name)) {
244            throw new IllegalArgumentException("Unknown Log: " + name);
245        }
246        return kUtils.listConsumers(topic)
247                     .stream()
248                     .filter(group -> group.startsWith(prefix))
249                     .map(ns::getGroup)
250                     .collect(Collectors.toList());
251    }
252
253}