001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.time.Duration; 025import java.util.Collections; 026import java.util.Objects; 027import java.util.Properties; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Future; 031import java.util.concurrent.atomic.AtomicInteger; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.kafka.clients.consumer.ConsumerConfig; 036import org.apache.kafka.clients.consumer.KafkaConsumer; 037import org.apache.kafka.clients.producer.KafkaProducer; 038import org.apache.kafka.clients.producer.ProducerConfig; 039import org.apache.kafka.clients.producer.ProducerRecord; 040import org.apache.kafka.clients.producer.RecordMetadata; 041import org.apache.kafka.common.TopicPartition; 042import org.apache.kafka.common.utils.Bytes; 043import org.nuxeo.lib.stream.StreamRuntimeException; 044import org.nuxeo.lib.stream.codec.Codec; 045import org.nuxeo.lib.stream.codec.SerializableCodec; 046import org.nuxeo.lib.stream.log.LogOffset; 047import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 048import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 049 050/** 051 * Apache Kafka implementation of Log. 052 * 053 * @since 9.3 054 */ 055public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> { 056 private static final Log log = LogFactory.getLog(KafkaLogAppender.class); 057 058 protected final String topic; 059 060 protected final Properties consumerProps; 061 062 protected final Properties producerProps; 063 064 protected final int size; 065 066 // keep track of created tailers to make sure they are closed 067 protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 068 069 protected final String name; 070 071 protected final KafkaNamespace ns; 072 073 protected final Codec<M> codec; 074 075 protected final Codec<M> encodingCodec; 076 077 protected KafkaProducer<String, Bytes> producer; 078 079 protected boolean closed; 080 081 protected static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); 082 083 private KafkaLogAppender(Codec<M> codec, KafkaNamespace ns, String name, Properties producerProperties, 084 Properties consumerProperties) { 085 Objects.requireNonNull(codec); 086 this.codec = codec; 087 if (NO_CODEC.equals(codec)) { 088 this.encodingCodec = new SerializableCodec<>(); 089 } else { 090 this.encodingCodec = codec; 091 } 092 this.ns = ns; 093 this.topic = ns.getTopicName(name); 094 this.name = name; 095 this.producerProps = producerProperties; 096 this.consumerProps = consumerProperties; 097 producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 098 name + "-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement()); 099 this.producer = new KafkaProducer<>(this.producerProps); 100 this.size = producer.partitionsFor(topic).size(); 101 if (log.isDebugEnabled()) { 102 log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size)); 103 } 104 } 105 106 public static <M extends Externalizable> KafkaLogAppender<M> open(Codec<M> codec, KafkaNamespace ns, String name, 107 Properties producerProperties, Properties consumerProperties) { 108 return new KafkaLogAppender<>(codec, ns, name, producerProperties, consumerProperties); 109 } 110 111 @Override 112 public String name() { 113 return name; 114 } 115 116 public String getTopic() { 117 return topic; 118 } 119 120 @Override 121 public int size() { 122 return size; 123 } 124 125 @Override 126 public LogOffset append(String key, M message) { 127 Objects.requireNonNull(key); 128 int partition = (key.hashCode() & 0x7fffffff) % size; 129 return append(partition, key, message); 130 } 131 132 @Override 133 public LogOffset append(int partition, M message) { 134 String key = String.valueOf(partition); 135 return append(partition, key, message); 136 } 137 138 public LogOffset append(int partition, String key, M message) { 139 Bytes value = Bytes.wrap(encodingCodec.encode(message)); 140 ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value); 141 Future<RecordMetadata> future = producer.send(record); 142 RecordMetadata result; 143 try { 144 result = future.get(); 145 } catch (InterruptedException e) { 146 Thread.currentThread().interrupt(); 147 throw new StreamRuntimeException("Unable to send record: " + record, e); 148 } catch (ExecutionException e) { 149 throw new StreamRuntimeException("Unable to send record: " + record, e); 150 } 151 LogOffset ret = new LogOffsetImpl(name, partition, result.offset()); 152 if (log.isDebugEnabled()) { 153 int len = record.value().get().length; 154 log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(), 155 len, key, message)); 156 } 157 return ret; 158 } 159 160 @Override 161 public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException { 162 boolean ret = false; 163 if (!name.equals(offset.partition().name())) { 164 throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset); 165 } 166 TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition()); 167 try { 168 ret = isProcessed(group, topicPartition, offset.offset()); 169 if (ret) { 170 return true; 171 } 172 long timeoutMs = timeout.toMillis(); 173 long deadline = System.currentTimeMillis() + timeoutMs; 174 long delay = Math.min(100, timeoutMs); 175 while (!ret && System.currentTimeMillis() < deadline) { 176 Thread.sleep(delay); 177 ret = isProcessed(group, topicPartition, offset.offset()); 178 } 179 return ret; 180 } finally { 181 if (log.isDebugEnabled()) { 182 log.debug("waitFor " + offset + "/" + group + " returns: " + ret); 183 } 184 } 185 } 186 187 @Override 188 public boolean closed() { 189 return closed; 190 } 191 192 @Override 193 public String toString() { 194 return "KafkaLogAppender{" + "name='" + name + '\'' + ", size=" + size + ", ns=" + ns + ", closed=" + closed 195 + ", codec=" + codec + '}'; 196 } 197 198 @Override 199 public Codec<M> getCodec() { 200 return codec; 201 } 202 203 protected boolean isProcessed(String group, TopicPartition topicPartition, long offset) { 204 // TODO: find a better way, this is expensive to create a consumer each time 205 // but this is needed, an open consumer is not properly updated 206 Properties props = (Properties) consumerProps.clone(); 207 props.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group)); 208 try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) { 209 consumer.assign(Collections.singletonList(topicPartition)); 210 long last = consumer.position(topicPartition); 211 boolean ret = last > 0 && last > offset; 212 if (log.isDebugEnabled()) { 213 log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group 214 + ":+" + offset + "? " + ret + ", current position: " + last); 215 } 216 return ret; 217 } 218 } 219 220 @Override 221 public void close() { 222 log.debug("Closing appender: " + name); 223 tailers.stream().filter(Objects::nonNull).forEach(tailer -> { 224 try { 225 tailer.close(); 226 } catch (Exception e) { 227 log.error("Failed to close tailer: " + tailer); 228 } 229 }); 230 tailers.clear(); 231 if (producer != null) { 232 producer.close(); 233 producer = null; 234 } 235 closed = true; 236 } 237}