001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.io.ByteArrayOutputStream;
022import java.io.Externalizable;
023import java.io.IOException;
024import java.io.ObjectOutput;
025import java.io.ObjectOutputStream;
026import java.time.Duration;
027import java.util.Collections;
028import java.util.Objects;
029import java.util.Properties;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.Future;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.apache.kafka.clients.consumer.ConsumerConfig;
037import org.apache.kafka.clients.consumer.KafkaConsumer;
038import org.apache.kafka.clients.producer.KafkaProducer;
039import org.apache.kafka.clients.producer.ProducerRecord;
040import org.apache.kafka.clients.producer.RecordMetadata;
041import org.apache.kafka.common.TopicPartition;
042import org.apache.kafka.common.utils.Bytes;
043import org.nuxeo.lib.stream.log.LogOffset;
044import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
045import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
046
047/**
048 * Apache Kafka implementation of Log.
049 *
050 * @since 9.3
051 */
052public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
053    private static final Log log = LogFactory.getLog(KafkaLogAppender.class);
054
055    protected final String topic;
056
057    protected final Properties consumerProps;
058
059    protected final Properties producerProps;
060
061    protected final int size;
062
063    // keep track of created tailers to make sure they are closed
064    protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
065
066    protected final String name;
067
068    protected final KafkaNamespace ns;
069
070    protected KafkaProducer<String, Bytes> producer;
071
072    protected boolean closed;
073
074    private KafkaLogAppender(KafkaNamespace ns, String name, Properties producerProperties, Properties consumerProperties) {
075        this.ns = ns;
076        this.topic = ns.getTopicName(name);
077        this.name = name;
078        this.producerProps = producerProperties;
079        this.consumerProps = consumerProperties;
080        this.producer = new KafkaProducer<>(this.producerProps);
081        this.size = producer.partitionsFor(topic).size();
082        if (log.isDebugEnabled()) {
083            log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size));
084        }
085    }
086
087    public static <M extends Externalizable> KafkaLogAppender<M> open(KafkaNamespace ns, String name,
088            Properties producerProperties, Properties consumerProperties) {
089        return new KafkaLogAppender<>(ns, name, producerProperties, consumerProperties);
090    }
091
092    @Override
093    public String name() {
094        return name;
095    }
096
097    public String getTopic() {
098        return topic;
099    }
100
101    @Override
102    public int size() {
103        return size;
104    }
105
106    @Override
107    public LogOffset append(int partition, Externalizable message) {
108        Bytes value = Bytes.wrap(messageAsByteArray(message));
109        String key = String.valueOf(partition);
110        ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value);
111        Future<RecordMetadata> future = producer.send(record);
112        RecordMetadata result;
113        try {
114            result = future.get();
115        } catch (InterruptedException e) {
116            Thread.currentThread().interrupt();
117            throw new RuntimeException("Unable to send record: " + record, e);
118        } catch (ExecutionException e) {
119            throw new RuntimeException("Unable to send record: " + record, e);
120        }
121        LogOffset ret = new LogOffsetImpl(name, partition, result.offset());
122        if (log.isDebugEnabled()) {
123            int len = record.value().get().length;
124            log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(),
125                    len, key, message));
126        }
127        return ret;
128    }
129
130    protected byte[] messageAsByteArray(Externalizable message) {
131        ObjectOutput out;
132        try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
133            out = new ObjectOutputStream(bos);
134            out.writeObject(message);
135            out.flush();
136            return bos.toByteArray();
137        } catch (IOException e) {
138            throw new RuntimeException(e);
139        }
140    }
141
142    @Override
143    public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException {
144        boolean ret = false;
145        if (!name.equals(offset.partition().name())) {
146            throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset);
147        }
148        TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition());
149        try {
150            ret = isProcessed(group, topicPartition, offset.offset());
151            if (ret) {
152                return true;
153            }
154            long timeoutMs = timeout.toMillis();
155            long deadline = System.currentTimeMillis() + timeoutMs;
156            long delay = Math.min(100, timeoutMs);
157            while (!ret && System.currentTimeMillis() < deadline) {
158                Thread.sleep(delay);
159                ret = isProcessed(group, topicPartition, offset.offset());
160            }
161            return ret;
162        } finally {
163            if (log.isDebugEnabled()) {
164                log.debug("waitFor " + offset + "/" + group + " returns: " + ret);
165            }
166        }
167    }
168
169    @Override
170    public boolean closed() {
171        return closed;
172    }
173
174    protected boolean isProcessed(String group, TopicPartition topicPartition, long offset) {
175        // TODO: find a better way, this is expensive to create a consumer each time
176        // but this is needed, an open consumer is not properly updated
177        Properties props = (Properties) consumerProps.clone();
178        props.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group));
179        try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) {
180            consumer.assign(Collections.singletonList(topicPartition));
181            long last = consumer.position(topicPartition);
182            boolean ret = last > 0 && last > offset;
183            if (log.isDebugEnabled()) {
184                log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group
185                        + ":+" + offset + "? " + ret + ", current position: " + last);
186            }
187            return ret;
188        }
189    }
190
191    @Override
192    public void close() {
193        log.debug("Closing appender: " + name);
194        tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
195            try {
196                tailer.close();
197            } catch (Exception e) {
198                log.error("Failed to close tailer: " + tailer);
199            }
200        });
201        tailers.clear();
202        if (producer != null) {
203            producer.close();
204            producer = null;
205        }
206        closed = true;
207    }
208}