001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.apache.kafka.clients.consumer.ConsumerConfig; 024import org.apache.kafka.clients.consumer.KafkaConsumer; 025import org.apache.kafka.clients.producer.KafkaProducer; 026import org.apache.kafka.clients.producer.ProducerRecord; 027import org.apache.kafka.clients.producer.RecordMetadata; 028import org.apache.kafka.common.TopicPartition; 029import org.apache.kafka.common.utils.Bytes; 030import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender; 031import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset; 032import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQOffsetImpl; 033 034import java.io.ByteArrayOutputStream; 035import java.io.Externalizable; 036import java.io.IOException; 037import java.io.ObjectOutput; 038import java.io.ObjectOutputStream; 039import java.time.Duration; 040import java.util.Collections; 041import java.util.Objects; 042import java.util.Properties; 043import java.util.concurrent.ConcurrentLinkedQueue; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.Future; 046 047/** 048 * Apache Kafka implementation of MQueue. 049 * 050 * @since 9.2 051 */ 052public class KafkaMQAppender<M extends Externalizable> implements MQAppender<M> { 053 private static final Log log = LogFactory.getLog(KafkaMQAppender.class); 054 private final String topic; 055 private final Properties consumerProps; 056 private final Properties producerProps; 057 private final int size; 058 private KafkaProducer<String, Bytes> producer; 059 // keep track of created tailers to make sure they are closed 060 private final ConcurrentLinkedQueue<KafkaMQTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 061 private final String name; 062 private boolean closed; 063 064 static public <M extends Externalizable> KafkaMQAppender<M> open(String topic, String name, Properties producerProperties, Properties consumerProperties) { 065 return new KafkaMQAppender<>(topic, name, producerProperties, consumerProperties); 066 } 067 068 private KafkaMQAppender(String topic, String name, Properties producerProperties, Properties consumerProperties) { 069 this.topic = topic; 070 this.name = name; 071 this.producerProps = producerProperties; 072 this.consumerProps = consumerProperties; 073 this.producer = new KafkaProducer<>(this.producerProps); 074 this.size = producer.partitionsFor(topic).size(); 075 log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size)); 076 } 077 078 @Override 079 public String name() { 080 return name; 081 } 082 083 public String getTopic() { 084 return topic; 085 } 086 087 @Override 088 public int size() { 089 return size; 090 } 091 092 @Override 093 public MQOffset append(int partition, Externalizable message) { 094 Bytes value = Bytes.wrap(messageAsByteArray(message)); 095 String key = String.valueOf(partition); 096 ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value); 097 Future<RecordMetadata> future = producer.send(record); 098 RecordMetadata result; 099 try { 100 result = future.get(); 101 } catch (InterruptedException e) { 102 Thread.currentThread().interrupt(); 103 throw new RuntimeException("Unable to send record: " + record, e); 104 } catch (ExecutionException e) { 105 throw new RuntimeException("Unable to send record: " + record, e); 106 } 107 MQOffset ret = new MQOffsetImpl(name, partition, result.offset()); 108 if (log.isDebugEnabled()) { 109 int len = record.value().get().length; 110 log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, 111 partition, ret.offset(), len, key, message)); 112 } 113 return ret; 114 } 115 116 private byte[] messageAsByteArray(Externalizable message) { 117 ObjectOutput out; 118 try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { 119 out = new ObjectOutputStream(bos); 120 out.writeObject(message); 121 out.flush(); 122 return bos.toByteArray(); 123 } catch (IOException e) { 124 throw new RuntimeException(e); 125 } 126 } 127 128 @Override 129 public boolean waitFor(MQOffset offset, String group, Duration timeout) throws InterruptedException { 130 boolean ret = false; 131 if (!name.equals(offset.partition().name())) { 132 throw new IllegalArgumentException(name + " can not wait for an offset of a different MQueue: " + offset); 133 } 134 TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition()); 135 try { 136 ret = isProcessed(group, topicPartition, offset.offset()); 137 if (ret) { 138 return true; 139 } 140 final long timeoutMs = timeout.toMillis(); 141 final long deadline = System.currentTimeMillis() + timeoutMs; 142 final long delay = Math.min(100, timeoutMs); 143 while (!ret && System.currentTimeMillis() < deadline) { 144 Thread.sleep(delay); 145 ret = isProcessed(group, topicPartition, offset.offset()); 146 } 147 return ret; 148 } finally { 149 if (log.isDebugEnabled()) { 150 log.debug("waitFor " + offset + "/" + group + " returns: " + ret); 151 } 152 } 153 } 154 155 @Override 156 public boolean closed() { 157 return closed; 158 } 159 160 private boolean isProcessed(String group, TopicPartition topicPartition, long offset) { 161 // TODO: find a better way, this is expensive to create a consumer each time 162 // but this is needed, an open consumer is not properly updated 163 Properties props = (Properties) consumerProps.clone(); 164 props.put(ConsumerConfig.GROUP_ID_CONFIG, group); 165 KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props); 166 consumer.assign(Collections.singletonList(topicPartition)); 167 try { 168 long last = consumer.position(topicPartition); 169 boolean ret = (last > 0) && (last > offset); 170 if (log.isDebugEnabled()) { 171 log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group 172 + ":+" + offset + "? " + ret + ", current position: " + last); 173 } 174 return ret; 175 } finally { 176 if (consumer != null) { 177 consumer.close(); 178 } 179 } 180 } 181 182 183 @Override 184 public void close() throws Exception { 185 log.debug("Closing appender: " + name); 186 tailers.stream().filter(Objects::nonNull).forEach(tailer -> { 187 try { 188 tailer.close(); 189 } catch (Exception e) { 190 log.error("Failed to close tailer: " + tailer); 191 } 192 }); 193 tailers.clear(); 194 if (producer != null) { 195 producer.close(); 196 producer = null; 197 } 198 closed = true; 199 } 200}