001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.io.ByteArrayOutputStream;
022import java.io.Externalizable;
023import java.io.IOException;
024import java.io.ObjectOutput;
025import java.io.ObjectOutputStream;
026import java.time.Duration;
027import java.util.Collections;
028import java.util.Objects;
029import java.util.Properties;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.Future;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.apache.kafka.clients.consumer.ConsumerConfig;
037import org.apache.kafka.clients.consumer.KafkaConsumer;
038import org.apache.kafka.clients.producer.KafkaProducer;
039import org.apache.kafka.clients.producer.ProducerRecord;
040import org.apache.kafka.clients.producer.RecordMetadata;
041import org.apache.kafka.common.TopicPartition;
042import org.apache.kafka.common.utils.Bytes;
043import org.nuxeo.lib.stream.log.LogOffset;
044import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
045import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
046
047/**
048 * Apache Kafka implementation of Log.
049 *
050 * @since 9.3
051 */
052public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
053    private static final Log log = LogFactory.getLog(KafkaLogAppender.class);
054
055    protected final String topic;
056
057    protected final Properties consumerProps;
058
059    protected final Properties producerProps;
060
061    protected final int size;
062
063    // keep track of created tailers to make sure they are closed
064    protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
065
066    protected final String name;
067
068    protected KafkaProducer<String, Bytes> producer;
069
070    protected boolean closed;
071
072    private KafkaLogAppender(String topic, String name, Properties producerProperties, Properties consumerProperties) {
073        this.topic = topic;
074        this.name = name;
075        this.producerProps = producerProperties;
076        this.consumerProps = consumerProperties;
077        this.producer = new KafkaProducer<>(this.producerProps);
078        this.size = producer.partitionsFor(topic).size();
079        if (log.isDebugEnabled()) {
080            log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size));
081        }
082    }
083
084    public static <M extends Externalizable> KafkaLogAppender<M> open(String topic, String name,
085            Properties producerProperties, Properties consumerProperties) {
086        return new KafkaLogAppender<>(topic, name, producerProperties, consumerProperties);
087    }
088
089    @Override
090    public String name() {
091        return name;
092    }
093
094    public String getTopic() {
095        return topic;
096    }
097
098    @Override
099    public int size() {
100        return size;
101    }
102
103    @Override
104    public LogOffset append(int partition, Externalizable message) {
105        Bytes value = Bytes.wrap(messageAsByteArray(message));
106        String key = String.valueOf(partition);
107        ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value);
108        Future<RecordMetadata> future = producer.send(record);
109        RecordMetadata result;
110        try {
111            result = future.get();
112        } catch (InterruptedException e) {
113            Thread.currentThread().interrupt();
114            throw new RuntimeException("Unable to send record: " + record, e);
115        } catch (ExecutionException e) {
116            throw new RuntimeException("Unable to send record: " + record, e);
117        }
118        LogOffset ret = new LogOffsetImpl(name, partition, result.offset());
119        if (log.isDebugEnabled()) {
120            int len = record.value().get().length;
121            log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(),
122                    len, key, message));
123        }
124        return ret;
125    }
126
127    protected byte[] messageAsByteArray(Externalizable message) {
128        ObjectOutput out;
129        try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
130            out = new ObjectOutputStream(bos);
131            out.writeObject(message);
132            out.flush();
133            return bos.toByteArray();
134        } catch (IOException e) {
135            throw new RuntimeException(e);
136        }
137    }
138
139    @Override
140    public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException {
141        boolean ret = false;
142        if (!name.equals(offset.partition().name())) {
143            throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset);
144        }
145        TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition());
146        try {
147            ret = isProcessed(group, topicPartition, offset.offset());
148            if (ret) {
149                return true;
150            }
151            long timeoutMs = timeout.toMillis();
152            long deadline = System.currentTimeMillis() + timeoutMs;
153            long delay = Math.min(100, timeoutMs);
154            while (!ret && System.currentTimeMillis() < deadline) {
155                Thread.sleep(delay);
156                ret = isProcessed(group, topicPartition, offset.offset());
157            }
158            return ret;
159        } finally {
160            if (log.isDebugEnabled()) {
161                log.debug("waitFor " + offset + "/" + group + " returns: " + ret);
162            }
163        }
164    }
165
166    @Override
167    public boolean closed() {
168        return closed;
169    }
170
171    protected boolean isProcessed(String group, TopicPartition topicPartition, long offset) {
172        // TODO: find a better way, this is expensive to create a consumer each time
173        // but this is needed, an open consumer is not properly updated
174        Properties props = (Properties) consumerProps.clone();
175        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
176        try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) {
177            consumer.assign(Collections.singletonList(topicPartition));
178            long last = consumer.position(topicPartition);
179            boolean ret = last > 0 && last > offset;
180            if (log.isDebugEnabled()) {
181                log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group
182                        + ":+" + offset + "? " + ret + ", current position: " + last);
183            }
184            return ret;
185        }
186    }
187
188    @Override
189    public void close() {
190        log.debug("Closing appender: " + name);
191        tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
192            try {
193                tailer.close();
194            } catch (Exception e) {
195                log.error("Failed to close tailer: " + tailer);
196            }
197        });
198        tailers.clear();
199        if (producer != null) {
200            producer.close();
201            producer = null;
202        }
203        closed = true;
204    }
205}