001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.io.Externalizable;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Properties;
031import java.util.Set;
032import java.util.stream.Collectors;
033
034import org.apache.kafka.clients.consumer.ConsumerConfig;
035import org.apache.kafka.clients.consumer.KafkaConsumer;
036import org.apache.kafka.clients.consumer.OffsetAndMetadata;
037import org.apache.kafka.common.TopicPartition;
038import org.apache.kafka.common.utils.Bytes;
039import org.nuxeo.lib.stream.codec.Codec;
040import org.nuxeo.lib.stream.log.LogConfig;
041import org.nuxeo.lib.stream.log.LogLag;
042import org.nuxeo.lib.stream.log.LogPartition;
043import org.nuxeo.lib.stream.log.LogTailer;
044import org.nuxeo.lib.stream.log.Name;
045import org.nuxeo.lib.stream.log.RebalanceListener;
046import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
047import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
048
049/**
050 * @since 9.3
051 */
052public class KafkaLogManager extends AbstractLogManager {
053    public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable";
054
055    protected final List<KafkaLogConfig> configs;
056
057    protected final KafkaLogConfig defaultConfig;
058
059    protected final Map<KafkaLogConfig, KafkaUtils> kUtils = new HashMap<>();
060
061    /**
062     * @since 10.2
063     */
064    public KafkaLogManager(String prefix, Properties producerProperties, Properties consumerProperties) {
065        this(Collections.singletonList(new KafkaLogConfig("unknown", true, Collections.emptyList(), prefix, null,
066                producerProperties, consumerProperties)));
067    }
068
069    /**
070     * @since 11.1
071     */
072    public KafkaLogManager(List<KafkaLogConfig> kafkaConfigs) {
073        if (kafkaConfigs == null && kafkaConfigs.isEmpty()) {
074            throw new IllegalArgumentException("config required");
075        }
076        this.configs = kafkaConfigs;
077        this.defaultConfig = findDefaultConfig();
078        configs.forEach(config -> kUtils.put(config, new KafkaUtils(config.getAdminProperties())));
079    }
080
081    protected KafkaLogConfig findDefaultConfig() {
082        List<KafkaLogConfig> defaultConfigs = configs.stream()
083                                                     .filter(LogConfig::isDefault)
084                                                     .collect(Collectors.toList());
085        // use the last default config
086        if (defaultConfigs.isEmpty()) {
087            return configs.get(configs.size() - 1);
088        }
089        return defaultConfigs.get(defaultConfigs.size() - 1);
090    }
091
092    protected KafkaLogConfig getConfig(Name name) {
093        return configs.stream().filter(config -> config.match(name)).findFirst().orElse(defaultConfig);
094    }
095
096    protected KafkaLogConfig getConfig(Name name, Name group) {
097        return configs.stream().filter(config -> config.match(name, group)).findFirst().orElse(defaultConfig);
098    }
099
100    @Override
101    public void create(Name name, int size) {
102        KafkaLogConfig config = getConfig(name);
103        kUtils.get(config).createTopic(config.getResolver().getId(name), size, config.getReplicatorFactor());
104    }
105
106    @Override
107    protected int getSize(Name name) {
108        KafkaLogConfig config = getConfig(name);
109        return kUtils.get(config).partitions(config.getResolver().getId(name));
110    }
111
112    @Override
113    public boolean exists(Name name) {
114        KafkaLogConfig config = getConfig(name);
115        return kUtils.get(config).topicExists(config.getResolver().getId(name));
116    }
117
118    @Override
119    public <M extends Externalizable> CloseableLogAppender<M> createAppender(Name name, Codec<M> codec) {
120        KafkaLogConfig config = getConfig(name);
121        return KafkaLogAppender.open(codec, config.getResolver(), name, config.getProducerProperties(),
122                config.getConsumerProperties());
123    }
124
125    @Override
126    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, Name group,
127            Codec<M> codec) {
128        partitions.forEach(this::checkValidPartition);
129        if (partitions.isEmpty()) {
130            return KafkaLogTailer.createAndAssign(codec, defaultConfig.getResolver(), partitions, group,
131                    (Properties) defaultConfig.getConsumerProperties().clone());
132        }
133        KafkaLogConfig config = getConfig(partitions.iterator().next().name());
134        return KafkaLogTailer.createAndAssign(codec, config.getResolver(), partitions, group,
135                (Properties) config.getConsumerProperties().clone());
136    }
137
138    protected void checkValidPartition(LogPartition partition) {
139        KafkaLogConfig config = getConfig(partition.name());
140        int partitions = kUtils.get(config).getNumberOfPartitions(config.getResolver().getId(partition.name()));
141        if (partition.partition() >= partitions) {
142            throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions);
143        }
144    }
145
146    @Override
147    public void close() {
148        super.close();
149        configs.forEach(config -> kUtils.get(config).close());
150    }
151
152    @Override
153    public boolean supportSubscribe() {
154        return !defaultConfig.getDisableSubscribe();
155    }
156
157    @Override
158    protected <M extends Externalizable> LogTailer<M> doSubscribe(Name group, Collection<Name> names,
159            RebalanceListener listener, Codec<M> codec) {
160        KafkaLogConfig config = getConfig(names.iterator().next(), group);
161        return KafkaLogTailer.createAndSubscribe(codec, config.getResolver(), names, group,
162                (Properties) config.getConsumerProperties().clone(), listener);
163    }
164
165    @Override
166    public List<LogLag> getLagPerPartition(Name name, Name group) {
167        KafkaLogConfig config = getConfig(name, group);
168        Properties props = (Properties) config.getConsumerProperties().clone();
169        props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getResolver().getId(group));
170        props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getResolver().getId(group) + "-lag");
171        // Prevents to create multiple consumers with the same client/group ids
172        synchronized(KafkaLogManager.class) {
173            try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) {
174                Set<TopicPartition> topicPartitions = consumer.partitionsFor(config.getResolver().getId(name))
175                                                               .stream()
176                        .map(meta -> new TopicPartition(meta.topic(),
177                                meta.partition()))
178                        .collect(Collectors.toSet());
179                LogLag[] ret = new LogLag[topicPartitions.size()];
180                Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
181                Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(topicPartitions);
182                for (TopicPartition topicPartition : topicPartitions) {
183                    OffsetAndMetadata committed = committedOffsets.get(topicPartition);
184                    long committedOffset = 0L;
185                    if (committed != null) {
186                        committedOffset = committed.offset();
187                    }
188                    Long endOffset = endOffsets.get(topicPartition);
189                    if (endOffset == null) {
190                        endOffset = 0L;
191                    }
192                    ret[topicPartition.partition()] = new LogLag(committedOffset, endOffset);
193                }
194                return Arrays.asList(ret);
195            }
196        }
197    }
198
199    @Override
200    public List<Name> listAllNames() {
201        Set<String> allTopics = kUtils.get(defaultConfig).listTopics();
202        Set<Name> names = new HashSet<>(allTopics.size());
203        for (String topic : allTopics) {
204            for (KafkaLogConfig config : configs) {
205                if (topic.startsWith(config.getResolver().getPrefix())) {
206                    names.add(config.getResolver().getName(topic));
207                }
208            }
209        }
210        return new ArrayList<>(names);
211    }
212
213    @Override
214    public String toString() {
215        // TODO: filter displayed props
216        return "KafkaLogManager{" + "configs=" + configs + ", defaultConfig=" + defaultConfig + ", defaultResolver"
217                + defaultConfig.getResolver()
218                + '}';
219    }
220
221    protected String filterDisplayedProperties(Properties properties) {
222        String ret = properties.toString();
223        if (ret.indexOf("password") < 0) {
224            return ret;
225        }
226        return ret.replaceAll("password=.[^\\\"\\;\\,\\ ]*", "password=****");
227    }
228
229    @Override
230    public List<Name> listConsumerGroups(Name name) {
231        KafkaLogConfig config = getConfig(name);
232        String topic = config.getResolver().getId(name);
233        if (!kUtils.get(config).topicExists(topic)) {
234            throw new IllegalArgumentException("Unknown Log: " + name);
235        }
236        return kUtils.get(config)
237                     .listConsumers(topic)
238                     .stream()
239                     .filter(group -> group.startsWith(config.getResolver().getPrefix()))
240                     .map(config.getResolver()::getName)
241                     .collect(Collectors.toList());
242    }
243
244    @Override
245    public boolean delete(Name name) {
246        KafkaLogConfig config = getConfig(name);
247        return kUtils.get(config).delete(config.getResolver().getId(name));
248    }
249}