001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.time.Duration; 025import java.util.Collections; 026import java.util.Objects; 027import java.util.Properties; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Future; 031import java.util.concurrent.atomic.AtomicInteger; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.kafka.clients.consumer.ConsumerConfig; 036import org.apache.kafka.clients.consumer.KafkaConsumer; 037import org.apache.kafka.clients.producer.KafkaProducer; 038import org.apache.kafka.clients.producer.ProducerConfig; 039import org.apache.kafka.clients.producer.ProducerRecord; 040import org.apache.kafka.clients.producer.RecordMetadata; 041import org.apache.kafka.common.TopicPartition; 042import org.apache.kafka.common.utils.Bytes; 043import org.nuxeo.lib.stream.StreamRuntimeException; 044import org.nuxeo.lib.stream.codec.Codec; 045import org.nuxeo.lib.stream.codec.SerializableCodec; 046import org.nuxeo.lib.stream.log.LogOffset; 047import org.nuxeo.lib.stream.log.Name; 048import org.nuxeo.lib.stream.log.NameResolver; 049import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 050import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 051 052/** 053 * Apache Kafka implementation of Log. 054 * 055 * @since 9.3 056 */ 057public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> { 058 private static final Log log = LogFactory.getLog(KafkaLogAppender.class); 059 060 protected final String topic; 061 062 protected final Properties consumerProps; 063 064 protected final Properties producerProps; 065 066 protected final int size; 067 068 // keep track of created tailers to make sure they are closed 069 protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 070 071 protected final Name name; 072 073 protected final Codec<M> codec; 074 075 protected final Codec<M> encodingCodec; 076 077 protected final NameResolver resolver; 078 079 protected KafkaProducer<String, Bytes> producer; 080 081 protected boolean closed; 082 083 protected static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); 084 085 private KafkaLogAppender(Codec<M> codec, NameResolver resolver, Name name, Properties producerProperties, 086 Properties consumerProperties) { 087 Objects.requireNonNull(codec); 088 this.codec = codec; 089 this.resolver = resolver; 090 if (NO_CODEC.equals(codec)) { 091 this.encodingCodec = new SerializableCodec<>(); 092 } else { 093 this.encodingCodec = codec; 094 } 095 this.name = name; 096 this.topic = resolver.getId(name); 097 this.producerProps = producerProperties; 098 this.consumerProps = consumerProperties; 099 producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 100 resolver.getId(this.name) + "-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement()); 101 this.producer = new KafkaProducer<>(this.producerProps); 102 this.size = producer.partitionsFor(topic).size(); 103 if (log.isDebugEnabled()) { 104 log.debug(String.format("Created appender: %s on topic: %s with %d partitions", this.name, topic, size)); 105 } 106 } 107 108 public static <M extends Externalizable> KafkaLogAppender<M> open(Codec<M> codec, NameResolver resolver, Name name, 109 Properties producerProperties, Properties consumerProperties) { 110 return new KafkaLogAppender<>(codec, resolver, name, producerProperties, consumerProperties); 111 } 112 113 @Override 114 public Name name() { 115 return name; 116 } 117 118 public String getTopic() { 119 return topic; 120 } 121 122 @Override 123 public int size() { 124 return size; 125 } 126 127 @Override 128 public LogOffset append(String key, M message) { 129 Objects.requireNonNull(key); 130 int partition = (key.hashCode() & 0x7fffffff) % size; 131 return append(partition, key, message); 132 } 133 134 @Override 135 public LogOffset append(int partition, M message) { 136 String key = String.valueOf(partition); 137 return append(partition, key, message); 138 } 139 140 public LogOffset append(int partition, String key, M message) { 141 Bytes value = Bytes.wrap(encodingCodec.encode(message)); 142 ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value); 143 Future<RecordMetadata> future = producer.send(record); 144 RecordMetadata result; 145 try { 146 result = future.get(); 147 } catch (InterruptedException e) { 148 Thread.currentThread().interrupt(); 149 throw new StreamRuntimeException("Unable to send record: " + record, e); 150 } catch (ExecutionException e) { 151 throw new StreamRuntimeException("Unable to send record: " + record, e); 152 } 153 LogOffset ret = new LogOffsetImpl(name, partition, result.offset()); 154 if (log.isDebugEnabled()) { 155 int len = record.value().get().length; 156 log.debug(String.format("Append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(), 157 len, key, message)); 158 } 159 return ret; 160 } 161 162 @Override 163 public boolean waitFor(LogOffset offset, Name group, Duration timeout) throws InterruptedException { 164 boolean ret = false; 165 if (!name.equals(offset.partition().name())) { 166 throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset); 167 } 168 TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition()); 169 try { 170 ret = isProcessed(group, topicPartition, offset.offset()); 171 if (ret) { 172 return true; 173 } 174 long timeoutMs = timeout.toMillis(); 175 long deadline = System.currentTimeMillis() + timeoutMs; 176 long delay = Math.min(100, timeoutMs); 177 while (!ret && System.currentTimeMillis() < deadline) { 178 Thread.sleep(delay); 179 ret = isProcessed(group, topicPartition, offset.offset()); 180 } 181 return ret; 182 } finally { 183 if (log.isDebugEnabled()) { 184 log.debug("waitFor " + offset + "/" + group + " returns: " + ret); 185 } 186 } 187 } 188 189 @Override 190 public boolean closed() { 191 return closed; 192 } 193 194 @Override 195 public String toString() { 196 return "KafkaLogAppender{" + "name='" + name + '\'' + ", size=" + size + ", closed=" + closed 197 + ", codec=" + codec + '}'; 198 } 199 200 @Override 201 public Codec<M> getCodec() { 202 return codec; 203 } 204 205 protected boolean isProcessed(Name group, TopicPartition topicPartition, long offset) { 206 // TODO: find a better way, this is expensive to create a consumer each time 207 // but this is needed, an open consumer is not properly updated 208 Properties props = (Properties) consumerProps.clone(); 209 props.put(ConsumerConfig.GROUP_ID_CONFIG, resolver.getId(group)); 210 try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) { 211 consumer.assign(Collections.singletonList(topicPartition)); 212 long last = consumer.position(topicPartition); 213 boolean ret = last > 0 && last > offset; 214 if (log.isDebugEnabled()) { 215 log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group 216 + ":+" + offset + "? " + ret + ", current position: " + last); 217 } 218 return ret; 219 } 220 } 221 222 @Override 223 public void close() { 224 log.debug("Closing appender: " + name); 225 tailers.stream().filter(Objects::nonNull).forEach(tailer -> { 226 try { 227 tailer.close(); 228 } catch (Exception e) { 229 log.error("Failed to close tailer: " + tailer); 230 } 231 }); 232 tailers.clear(); 233 if (producer != null) { 234 producer.close(); 235 producer = null; 236 } 237 closed = true; 238 } 239}