001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.io.ByteArrayOutputStream; 022import java.io.Externalizable; 023import java.io.IOException; 024import java.io.ObjectOutput; 025import java.io.ObjectOutputStream; 026import java.time.Duration; 027import java.util.Collections; 028import java.util.Objects; 029import java.util.Properties; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.Future; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.apache.kafka.clients.consumer.ConsumerConfig; 037import org.apache.kafka.clients.consumer.KafkaConsumer; 038import org.apache.kafka.clients.producer.KafkaProducer; 039import org.apache.kafka.clients.producer.ProducerRecord; 040import org.apache.kafka.clients.producer.RecordMetadata; 041import org.apache.kafka.common.TopicPartition; 042import org.apache.kafka.common.utils.Bytes; 043import org.nuxeo.lib.stream.log.LogOffset; 044import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 045import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 046 047/** 048 * Apache Kafka implementation of Log. 049 * 050 * @since 9.3 051 */ 052public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> { 053 private static final Log log = LogFactory.getLog(KafkaLogAppender.class); 054 055 protected final String topic; 056 057 protected final Properties consumerProps; 058 059 protected final Properties producerProps; 060 061 protected final int size; 062 063 // keep track of created tailers to make sure they are closed 064 protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 065 066 protected final String name; 067 068 protected KafkaProducer<String, Bytes> producer; 069 070 protected boolean closed; 071 072 private KafkaLogAppender(String topic, String name, Properties producerProperties, Properties consumerProperties) { 073 this.topic = topic; 074 this.name = name; 075 this.producerProps = producerProperties; 076 this.consumerProps = consumerProperties; 077 this.producer = new KafkaProducer<>(this.producerProps); 078 this.size = producer.partitionsFor(topic).size(); 079 if (log.isDebugEnabled()) { 080 log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size)); 081 } 082 } 083 084 public static <M extends Externalizable> KafkaLogAppender<M> open(String topic, String name, 085 Properties producerProperties, Properties consumerProperties) { 086 return new KafkaLogAppender<>(topic, name, producerProperties, consumerProperties); 087 } 088 089 @Override 090 public String name() { 091 return name; 092 } 093 094 public String getTopic() { 095 return topic; 096 } 097 098 @Override 099 public int size() { 100 return size; 101 } 102 103 @Override 104 public LogOffset append(int partition, Externalizable message) { 105 Bytes value = Bytes.wrap(messageAsByteArray(message)); 106 String key = String.valueOf(partition); 107 ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value); 108 Future<RecordMetadata> future = producer.send(record); 109 RecordMetadata result; 110 try { 111 result = future.get(); 112 } catch (InterruptedException e) { 113 Thread.currentThread().interrupt(); 114 throw new RuntimeException("Unable to send record: " + record, e); 115 } catch (ExecutionException e) { 116 throw new RuntimeException("Unable to send record: " + record, e); 117 } 118 LogOffset ret = new LogOffsetImpl(name, partition, result.offset()); 119 if (log.isDebugEnabled()) { 120 int len = record.value().get().length; 121 log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(), 122 len, key, message)); 123 } 124 return ret; 125 } 126 127 protected byte[] messageAsByteArray(Externalizable message) { 128 ObjectOutput out; 129 try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { 130 out = new ObjectOutputStream(bos); 131 out.writeObject(message); 132 out.flush(); 133 return bos.toByteArray(); 134 } catch (IOException e) { 135 throw new RuntimeException(e); 136 } 137 } 138 139 @Override 140 public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException { 141 boolean ret = false; 142 if (!name.equals(offset.partition().name())) { 143 throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset); 144 } 145 TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition()); 146 try { 147 ret = isProcessed(group, topicPartition, offset.offset()); 148 if (ret) { 149 return true; 150 } 151 long timeoutMs = timeout.toMillis(); 152 long deadline = System.currentTimeMillis() + timeoutMs; 153 long delay = Math.min(100, timeoutMs); 154 while (!ret && System.currentTimeMillis() < deadline) { 155 Thread.sleep(delay); 156 ret = isProcessed(group, topicPartition, offset.offset()); 157 } 158 return ret; 159 } finally { 160 if (log.isDebugEnabled()) { 161 log.debug("waitFor " + offset + "/" + group + " returns: " + ret); 162 } 163 } 164 } 165 166 @Override 167 public boolean closed() { 168 return closed; 169 } 170 171 protected boolean isProcessed(String group, TopicPartition topicPartition, long offset) { 172 // TODO: find a better way, this is expensive to create a consumer each time 173 // but this is needed, an open consumer is not properly updated 174 Properties props = (Properties) consumerProps.clone(); 175 props.put(ConsumerConfig.GROUP_ID_CONFIG, group); 176 try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) { 177 consumer.assign(Collections.singletonList(topicPartition)); 178 long last = consumer.position(topicPartition); 179 boolean ret = last > 0 && last > offset; 180 if (log.isDebugEnabled()) { 181 log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group 182 + ":+" + offset + "? " + ret + ", current position: " + last); 183 } 184 return ret; 185 } 186 } 187 188 @Override 189 public void close() { 190 log.debug("Closing appender: " + name); 191 tailers.stream().filter(Objects::nonNull).forEach(tailer -> { 192 try { 193 tailer.close(); 194 } catch (Exception e) { 195 log.error("Failed to close tailer: " + tailer); 196 } 197 }); 198 tailers.clear(); 199 if (producer != null) { 200 producer.close(); 201 producer = null; 202 } 203 closed = true; 204 } 205}