001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka; 020 021import org.apache.kafka.clients.consumer.ConsumerConfig; 022import org.apache.kafka.clients.producer.ProducerConfig; 023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender; 024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener; 026import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer; 027import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.AbstractMQManager; 028 029import java.io.Externalizable; 030import java.util.Collection; 031import java.util.Properties; 032 033/** 034 * @since 9.2 035 */ 036public class KafkaMQManager<M extends Externalizable> extends AbstractMQManager<M> { 037 public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable"; 038 public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor"; 039 040 private final KafkaUtils kUtils; 041 private final Properties producerProperties; 042 private final Properties consumerProperties; 043 private final String prefix; 044 private final Integer defaultReplicationFactor; 045 private boolean disableSubscribe = false; 046 047 048 public KafkaMQManager(String zkServers, Properties producerProperties, Properties consumerProperties) { 049 this(zkServers, null, producerProperties, consumerProperties); 050 } 051 052 public KafkaMQManager(String zkServers, String topicPrefix, Properties producerProperties, Properties consumerProperties) { 053 this.prefix = (topicPrefix != null) ? topicPrefix : ""; 054 this.kUtils = new KafkaUtils(zkServers); 055 disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false")); 056 defaultReplicationFactor = Integer.valueOf(producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1")); 057 this.producerProperties = normalizeProducerProperties(producerProperties); 058 this.consumerProperties = normalizeConsumerProperties(consumerProperties); 059 } 060 061 protected String getTopicName(String name) { 062 return prefix + name; 063 } 064 065 @Override 066 public void create(String name, int size) { 067 kUtils.createTopic(getTopicName(name), size, defaultReplicationFactor); 068 } 069 070 @Override 071 public boolean exists(String name) { 072 return kUtils.topicExists(getTopicName(name)); 073 } 074 075 076 @Override 077 public MQAppender<M> createAppender(String name) { 078 return KafkaMQAppender.open(getTopicName(name), name, producerProperties, consumerProperties); 079 } 080 081 @Override 082 protected MQTailer<M> acquireTailer(Collection<MQPartition> partitions, String group) { 083 partitions.forEach(this::checkValidPartition); 084 return KafkaMQTailer.createAndAssign(prefix, partitions, group, (Properties) consumerProperties.clone()); 085 } 086 087 private void checkValidPartition(MQPartition partition) { 088 int partitions = kUtils.getNumberOfPartitions(getTopicName(partition.name())); 089 if (partition.partition() >= partitions) { 090 throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions); 091 } 092 } 093 094 public Properties getProducerProperties() { 095 return producerProperties; 096 } 097 098 public Properties getConsumerProperties() { 099 return consumerProperties; 100 } 101 102 @Override 103 public void close() throws Exception { 104 super.close(); 105 if (kUtils != null) { 106 kUtils.close(); 107 } 108 } 109 110 @Override 111 public boolean supportSubscribe() { 112 return !disableSubscribe; 113 } 114 115 @Override 116 protected MQTailer<M> doSubscribe(String group, Collection<String> names, MQRebalanceListener listener) { 117 return KafkaMQTailer.createAndSubscribe(prefix, names, group, (Properties) consumerProperties.clone(), listener); 118 } 119 120 protected static Properties normalizeConsumerProperties(Properties consumerProperties) { 121 Properties ret; 122 if (consumerProperties != null) { 123 ret = (Properties) consumerProperties.clone(); 124 } else { 125 ret = new Properties(); 126 } 127 ret.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 128 ret.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesDeserializer"); 129 ret.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 130 ret.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 131 ret.remove(DISABLE_SUBSCRIBE_PROP); 132 return ret; 133 } 134 135 protected Properties normalizeProducerProperties(Properties producerProperties) { 136 Properties ret; 137 if (producerProperties != null) { 138 ret = (Properties) producerProperties.clone(); 139 } else { 140 ret = new Properties(); 141 } 142 ret.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 143 ret.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer"); 144 ret.remove(DEFAULT_REPLICATION_FACTOR_PROP); 145 return ret; 146 } 147 148}