001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.time.Duration; 025import java.util.Collections; 026import java.util.Objects; 027import java.util.Properties; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Future; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.apache.kafka.clients.consumer.ConsumerConfig; 035import org.apache.kafka.clients.consumer.KafkaConsumer; 036import org.apache.kafka.clients.producer.KafkaProducer; 037import org.apache.kafka.clients.producer.ProducerRecord; 038import org.apache.kafka.clients.producer.RecordMetadata; 039import org.apache.kafka.common.TopicPartition; 040import org.apache.kafka.common.utils.Bytes; 041import org.nuxeo.lib.stream.StreamRuntimeException; 042import org.nuxeo.lib.stream.codec.Codec; 043import org.nuxeo.lib.stream.codec.SerializableCodec; 044import org.nuxeo.lib.stream.log.LogOffset; 045import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 046import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 047 048/** 049 * Apache Kafka implementation of Log. 050 * 051 * @since 9.3 052 */ 053public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> { 054 private static final Log log = LogFactory.getLog(KafkaLogAppender.class); 055 056 protected final String topic; 057 058 protected final Properties consumerProps; 059 060 protected final Properties producerProps; 061 062 protected final int size; 063 064 // keep track of created tailers to make sure they are closed 065 protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 066 067 protected final String name; 068 069 protected final KafkaNamespace ns; 070 071 protected final Codec<M> codec; 072 073 protected final Codec<M> encodingCodec; 074 075 protected KafkaProducer<String, Bytes> producer; 076 077 protected boolean closed; 078 079 private KafkaLogAppender(Codec<M> codec, KafkaNamespace ns, String name, Properties producerProperties, 080 Properties consumerProperties) { 081 Objects.requireNonNull(codec); 082 this.codec = codec; 083 if (NO_CODEC.equals(codec)) { 084 this.encodingCodec = new SerializableCodec<>(); 085 } else { 086 this.encodingCodec = codec; 087 } 088 this.ns = ns; 089 this.topic = ns.getTopicName(name); 090 this.name = name; 091 this.producerProps = producerProperties; 092 this.consumerProps = consumerProperties; 093 this.producer = new KafkaProducer<>(this.producerProps); 094 this.size = producer.partitionsFor(topic).size(); 095 if (log.isDebugEnabled()) { 096 log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size)); 097 } 098 } 099 100 public static <M extends Externalizable> KafkaLogAppender<M> open(Codec<M> codec, KafkaNamespace ns, String name, 101 Properties producerProperties, Properties consumerProperties) { 102 return new KafkaLogAppender<>(codec, ns, name, producerProperties, consumerProperties); 103 } 104 105 @Override 106 public String name() { 107 return name; 108 } 109 110 public String getTopic() { 111 return topic; 112 } 113 114 @Override 115 public int size() { 116 return size; 117 } 118 119 @Override 120 public LogOffset append(String key, M message) { 121 Objects.requireNonNull(key); 122 int partition = (key.hashCode() & 0x7fffffff) % size; 123 return append(partition, key, message); 124 } 125 126 @Override 127 public LogOffset append(int partition, M message) { 128 String key = String.valueOf(partition); 129 return append(partition, key, message); 130 } 131 132 public LogOffset append(int partition, String key, M message) { 133 Bytes value = Bytes.wrap(encodingCodec.encode(message)); 134 ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value); 135 Future<RecordMetadata> future = producer.send(record); 136 RecordMetadata result; 137 try { 138 result = future.get(); 139 } catch (InterruptedException e) { 140 Thread.currentThread().interrupt(); 141 throw new StreamRuntimeException("Unable to send record: " + record, e); 142 } catch (ExecutionException e) { 143 throw new StreamRuntimeException("Unable to send record: " + record, e); 144 } 145 LogOffset ret = new LogOffsetImpl(name, partition, result.offset()); 146 if (log.isDebugEnabled()) { 147 int len = record.value().get().length; 148 log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(), 149 len, key, message)); 150 } 151 return ret; 152 } 153 154 @Override 155 public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException { 156 boolean ret = false; 157 if (!name.equals(offset.partition().name())) { 158 throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset); 159 } 160 TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition()); 161 try { 162 ret = isProcessed(group, topicPartition, offset.offset()); 163 if (ret) { 164 return true; 165 } 166 long timeoutMs = timeout.toMillis(); 167 long deadline = System.currentTimeMillis() + timeoutMs; 168 long delay = Math.min(100, timeoutMs); 169 while (!ret && System.currentTimeMillis() < deadline) { 170 Thread.sleep(delay); 171 ret = isProcessed(group, topicPartition, offset.offset()); 172 } 173 return ret; 174 } finally { 175 if (log.isDebugEnabled()) { 176 log.debug("waitFor " + offset + "/" + group + " returns: " + ret); 177 } 178 } 179 } 180 181 @Override 182 public boolean closed() { 183 return closed; 184 } 185 186 @Override 187 public String toString() { 188 return "KafkaLogAppender{" + "name='" + name + '\'' + ", size=" + size + ", ns=" + ns + ", closed=" + closed 189 + ", codec=" + codec + '}'; 190 } 191 192 @Override 193 public Codec<M> getCodec() { 194 return codec; 195 } 196 197 protected boolean isProcessed(String group, TopicPartition topicPartition, long offset) { 198 // TODO: find a better way, this is expensive to create a consumer each time 199 // but this is needed, an open consumer is not properly updated 200 Properties props = (Properties) consumerProps.clone(); 201 props.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group)); 202 try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) { 203 consumer.assign(Collections.singletonList(topicPartition)); 204 long last = consumer.position(topicPartition); 205 boolean ret = last > 0 && last > offset; 206 if (log.isDebugEnabled()) { 207 log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group 208 + ":+" + offset + "? " + ret + ", current position: " + last); 209 } 210 return ret; 211 } 212 } 213 214 @Override 215 public void close() { 216 log.debug("Closing appender: " + name); 217 tailers.stream().filter(Objects::nonNull).forEach(tailer -> { 218 try { 219 tailer.close(); 220 } catch (Exception e) { 221 log.error("Failed to close tailer: " + tailer); 222 } 223 }); 224 tailers.clear(); 225 if (producer != null) { 226 producer.close(); 227 producer = null; 228 } 229 closed = true; 230 } 231}