001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.io.ByteArrayOutputStream; 022import java.io.Externalizable; 023import java.io.IOException; 024import java.io.ObjectOutput; 025import java.io.ObjectOutputStream; 026import java.time.Duration; 027import java.util.Collections; 028import java.util.Objects; 029import java.util.Properties; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.Future; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.apache.kafka.clients.consumer.ConsumerConfig; 037import org.apache.kafka.clients.consumer.KafkaConsumer; 038import org.apache.kafka.clients.producer.KafkaProducer; 039import org.apache.kafka.clients.producer.ProducerRecord; 040import org.apache.kafka.clients.producer.RecordMetadata; 041import org.apache.kafka.common.TopicPartition; 042import org.apache.kafka.common.utils.Bytes; 043import org.nuxeo.lib.stream.log.LogOffset; 044import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 045import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 046 047/** 048 * Apache Kafka implementation of Log. 049 * 050 * @since 9.3 051 */ 052public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> { 053 private static final Log log = LogFactory.getLog(KafkaLogAppender.class); 054 055 protected final String topic; 056 057 protected final Properties consumerProps; 058 059 protected final Properties producerProps; 060 061 protected final int size; 062 063 // keep track of created tailers to make sure they are closed 064 protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 065 066 protected final String name; 067 068 protected final KafkaNamespace ns; 069 070 protected KafkaProducer<String, Bytes> producer; 071 072 protected boolean closed; 073 074 private KafkaLogAppender(KafkaNamespace ns, String name, Properties producerProperties, Properties consumerProperties) { 075 this.ns = ns; 076 this.topic = ns.getTopicName(name); 077 this.name = name; 078 this.producerProps = producerProperties; 079 this.consumerProps = consumerProperties; 080 this.producer = new KafkaProducer<>(this.producerProps); 081 this.size = producer.partitionsFor(topic).size(); 082 if (log.isDebugEnabled()) { 083 log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size)); 084 } 085 } 086 087 public static <M extends Externalizable> KafkaLogAppender<M> open(KafkaNamespace ns, String name, 088 Properties producerProperties, Properties consumerProperties) { 089 return new KafkaLogAppender<>(ns, name, producerProperties, consumerProperties); 090 } 091 092 @Override 093 public String name() { 094 return name; 095 } 096 097 public String getTopic() { 098 return topic; 099 } 100 101 @Override 102 public int size() { 103 return size; 104 } 105 106 @Override 107 public LogOffset append(int partition, Externalizable message) { 108 Bytes value = Bytes.wrap(messageAsByteArray(message)); 109 String key = String.valueOf(partition); 110 ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value); 111 Future<RecordMetadata> future = producer.send(record); 112 RecordMetadata result; 113 try { 114 result = future.get(); 115 } catch (InterruptedException e) { 116 Thread.currentThread().interrupt(); 117 throw new RuntimeException("Unable to send record: " + record, e); 118 } catch (ExecutionException e) { 119 throw new RuntimeException("Unable to send record: " + record, e); 120 } 121 LogOffset ret = new LogOffsetImpl(name, partition, result.offset()); 122 if (log.isDebugEnabled()) { 123 int len = record.value().get().length; 124 log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(), 125 len, key, message)); 126 } 127 return ret; 128 } 129 130 protected byte[] messageAsByteArray(Externalizable message) { 131 ObjectOutput out; 132 try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { 133 out = new ObjectOutputStream(bos); 134 out.writeObject(message); 135 out.flush(); 136 return bos.toByteArray(); 137 } catch (IOException e) { 138 throw new RuntimeException(e); 139 } 140 } 141 142 @Override 143 public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException { 144 boolean ret = false; 145 if (!name.equals(offset.partition().name())) { 146 throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset); 147 } 148 TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition()); 149 try { 150 ret = isProcessed(group, topicPartition, offset.offset()); 151 if (ret) { 152 return true; 153 } 154 long timeoutMs = timeout.toMillis(); 155 long deadline = System.currentTimeMillis() + timeoutMs; 156 long delay = Math.min(100, timeoutMs); 157 while (!ret && System.currentTimeMillis() < deadline) { 158 Thread.sleep(delay); 159 ret = isProcessed(group, topicPartition, offset.offset()); 160 } 161 return ret; 162 } finally { 163 if (log.isDebugEnabled()) { 164 log.debug("waitFor " + offset + "/" + group + " returns: " + ret); 165 } 166 } 167 } 168 169 @Override 170 public boolean closed() { 171 return closed; 172 } 173 174 protected boolean isProcessed(String group, TopicPartition topicPartition, long offset) { 175 // TODO: find a better way, this is expensive to create a consumer each time 176 // but this is needed, an open consumer is not properly updated 177 Properties props = (Properties) consumerProps.clone(); 178 props.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group)); 179 try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) { 180 consumer.assign(Collections.singletonList(topicPartition)); 181 long last = consumer.position(topicPartition); 182 boolean ret = last > 0 && last > offset; 183 if (log.isDebugEnabled()) { 184 log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group 185 + ":+" + offset + "? " + ret + ", current position: " + last); 186 } 187 return ret; 188 } 189 } 190 191 @Override 192 public void close() { 193 log.debug("Closing appender: " + name); 194 tailers.stream().filter(Objects::nonNull).forEach(tailer -> { 195 try { 196 tailer.close(); 197 } catch (Exception e) { 198 log.error("Failed to close tailer: " + tailer); 199 } 200 }); 201 tailers.clear(); 202 if (producer != null) { 203 producer.close(); 204 producer = null; 205 } 206 closed = true; 207 } 208}