001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.apache.kafka.clients.consumer.ConsumerConfig;
024import org.apache.kafka.clients.consumer.KafkaConsumer;
025import org.apache.kafka.clients.producer.KafkaProducer;
026import org.apache.kafka.clients.producer.ProducerRecord;
027import org.apache.kafka.clients.producer.RecordMetadata;
028import org.apache.kafka.common.TopicPartition;
029import org.apache.kafka.common.utils.Bytes;
030import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
031import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset;
032import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQOffsetImpl;
033
034import java.io.ByteArrayOutputStream;
035import java.io.Externalizable;
036import java.io.IOException;
037import java.io.ObjectOutput;
038import java.io.ObjectOutputStream;
039import java.time.Duration;
040import java.util.Collections;
041import java.util.Objects;
042import java.util.Properties;
043import java.util.concurrent.ConcurrentLinkedQueue;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.Future;
046
047/**
048 * Apache Kafka implementation of MQueue.
049 *
050 * @since 9.2
051 */
052public class KafkaMQAppender<M extends Externalizable> implements MQAppender<M> {
053    private static final Log log = LogFactory.getLog(KafkaMQAppender.class);
054    private final String topic;
055    private final Properties consumerProps;
056    private final Properties producerProps;
057    private final int size;
058    private KafkaProducer<String, Bytes> producer;
059    // keep track of created tailers to make sure they are closed
060    private final ConcurrentLinkedQueue<KafkaMQTailer<M>> tailers = new ConcurrentLinkedQueue<>();
061    private final String name;
062    private boolean closed;
063
064    static public <M extends Externalizable> KafkaMQAppender<M> open(String topic, String name, Properties producerProperties, Properties consumerProperties) {
065        return new KafkaMQAppender<>(topic, name, producerProperties, consumerProperties);
066    }
067
068    private KafkaMQAppender(String topic, String name, Properties producerProperties, Properties consumerProperties) {
069        this.topic = topic;
070        this.name = name;
071        this.producerProps = producerProperties;
072        this.consumerProps = consumerProperties;
073        this.producer = new KafkaProducer<>(this.producerProps);
074        this.size = producer.partitionsFor(topic).size();
075        log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size));
076    }
077
078    @Override
079    public String name() {
080        return name;
081    }
082
083    public String getTopic() {
084        return topic;
085    }
086
087    @Override
088    public int size() {
089        return size;
090    }
091
092    @Override
093    public MQOffset append(int partition, Externalizable message) {
094        Bytes value = Bytes.wrap(messageAsByteArray(message));
095        String key = String.valueOf(partition);
096        ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value);
097        Future<RecordMetadata> future = producer.send(record);
098        RecordMetadata result;
099        try {
100            result = future.get();
101        } catch (InterruptedException e) {
102            Thread.currentThread().interrupt();
103            throw new RuntimeException("Unable to send record: " + record, e);
104        } catch (ExecutionException e) {
105            throw new RuntimeException("Unable to send record: " + record, e);
106        }
107        MQOffset ret = new MQOffsetImpl(name, partition, result.offset());
108        if (log.isDebugEnabled()) {
109            int len = record.value().get().length;
110            log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name,
111                    partition, ret.offset(), len, key, message));
112        }
113        return ret;
114    }
115
116    private byte[] messageAsByteArray(Externalizable message) {
117        ObjectOutput out;
118        try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
119            out = new ObjectOutputStream(bos);
120            out.writeObject(message);
121            out.flush();
122            return bos.toByteArray();
123        } catch (IOException e) {
124            throw new RuntimeException(e);
125        }
126    }
127
128    @Override
129    public boolean waitFor(MQOffset offset, String group, Duration timeout) throws InterruptedException {
130        boolean ret = false;
131        if (!name.equals(offset.partition().name())) {
132            throw new IllegalArgumentException(name + " can not wait for an offset of a different MQueue: " + offset);
133        }
134        TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition());
135        try {
136            ret = isProcessed(group, topicPartition, offset.offset());
137            if (ret) {
138                return true;
139            }
140            final long timeoutMs = timeout.toMillis();
141            final long deadline = System.currentTimeMillis() + timeoutMs;
142            final long delay = Math.min(100, timeoutMs);
143            while (!ret && System.currentTimeMillis() < deadline) {
144                Thread.sleep(delay);
145                ret = isProcessed(group, topicPartition, offset.offset());
146            }
147            return ret;
148        } finally {
149            if (log.isDebugEnabled()) {
150                log.debug("waitFor " + offset + "/" + group + " returns: " + ret);
151            }
152        }
153    }
154
155    @Override
156    public boolean closed() {
157        return closed;
158    }
159
160    private boolean isProcessed(String group, TopicPartition topicPartition, long offset) {
161        // TODO: find a better way, this is expensive to create a consumer each time
162        // but this is needed, an open consumer is not properly updated
163        Properties props = (Properties) consumerProps.clone();
164        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
165        KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props);
166        consumer.assign(Collections.singletonList(topicPartition));
167        try {
168            long last = consumer.position(topicPartition);
169            boolean ret = (last > 0) && (last > offset);
170            if (log.isDebugEnabled()) {
171                log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group
172                        + ":+" + offset + "? " + ret + ", current position: " + last);
173            }
174            return ret;
175        } finally {
176            if (consumer != null) {
177                consumer.close();
178            }
179        }
180    }
181
182
183    @Override
184    public void close() throws Exception {
185        log.debug("Closing appender: " + name);
186        tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
187            try {
188                tailer.close();
189            } catch (Exception e) {
190                log.error("Failed to close tailer: " + tailer);
191            }
192        });
193        tailers.clear();
194        if (producer != null) {
195            producer.close();
196            producer = null;
197        }
198        closed = true;
199    }
200}