001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka;
020
021import org.apache.kafka.clients.consumer.ConsumerConfig;
022import org.apache.kafka.clients.producer.ProducerConfig;
023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener;
026import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
027import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.AbstractMQManager;
028
029import java.io.Externalizable;
030import java.util.Collection;
031import java.util.Properties;
032
033/**
034 * @since 9.2
035 */
036public class KafkaMQManager<M extends Externalizable> extends AbstractMQManager<M> {
037    public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable";
038    public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor";
039
040    private final KafkaUtils kUtils;
041    private final Properties producerProperties;
042    private final Properties consumerProperties;
043    private final String prefix;
044    private final Integer defaultReplicationFactor;
045    private boolean disableSubscribe = false;
046
047
048    public KafkaMQManager(String zkServers, Properties producerProperties, Properties consumerProperties) {
049        this(zkServers, null, producerProperties, consumerProperties);
050    }
051
052    public KafkaMQManager(String zkServers, String topicPrefix, Properties producerProperties, Properties consumerProperties) {
053        this.prefix = (topicPrefix != null) ? topicPrefix : "";
054        this.kUtils = new KafkaUtils(zkServers);
055        disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false"));
056        defaultReplicationFactor = Integer.valueOf(producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1"));
057        this.producerProperties = normalizeProducerProperties(producerProperties);
058        this.consumerProperties = normalizeConsumerProperties(consumerProperties);
059    }
060
061    protected String getTopicName(String name) {
062        return prefix + name;
063    }
064
065    @Override
066    public void create(String name, int size) {
067        kUtils.createTopic(getTopicName(name), size, defaultReplicationFactor);
068    }
069
070    @Override
071    public boolean exists(String name) {
072        return kUtils.topicExists(getTopicName(name));
073    }
074
075
076    @Override
077    public MQAppender<M> createAppender(String name) {
078        return KafkaMQAppender.open(getTopicName(name), name, producerProperties, consumerProperties);
079    }
080
081    @Override
082    protected MQTailer<M> acquireTailer(Collection<MQPartition> partitions, String group) {
083        partitions.forEach(this::checkValidPartition);
084        return KafkaMQTailer.createAndAssign(prefix, partitions, group, (Properties) consumerProperties.clone());
085    }
086
087    private void checkValidPartition(MQPartition partition) {
088        int partitions = kUtils.getNumberOfPartitions(getTopicName(partition.name()));
089        if (partition.partition() >= partitions) {
090            throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions);
091        }
092    }
093
094    public Properties getProducerProperties() {
095        return producerProperties;
096    }
097
098    public Properties getConsumerProperties() {
099        return consumerProperties;
100    }
101
102    @Override
103    public void close() throws Exception {
104        super.close();
105        if (kUtils != null) {
106            kUtils.close();
107        }
108    }
109
110    @Override
111    public boolean supportSubscribe() {
112        return !disableSubscribe;
113    }
114
115    @Override
116    protected MQTailer<M> doSubscribe(String group, Collection<String> names, MQRebalanceListener listener) {
117        return KafkaMQTailer.createAndSubscribe(prefix, names, group, (Properties) consumerProperties.clone(), listener);
118    }
119
120    protected static Properties normalizeConsumerProperties(Properties consumerProperties) {
121        Properties ret;
122        if (consumerProperties != null) {
123            ret = (Properties) consumerProperties.clone();
124        } else {
125            ret = new Properties();
126        }
127        ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
128        ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesDeserializer");
129        ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
130        ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
131        ret.remove(DISABLE_SUBSCRIBE_PROP);
132        return ret;
133    }
134
135    protected Properties normalizeProducerProperties(Properties producerProperties) {
136        Properties ret;
137        if (producerProperties != null) {
138            ret = (Properties) producerProperties.clone();
139        } else {
140            ret = new Properties();
141        }
142        ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
143        ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer");
144        ret.remove(DEFAULT_REPLICATION_FACTOR_PROP);
145        return ret;
146    }
147
148}