001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.time.Duration;
025import java.util.Collections;
026import java.util.Objects;
027import java.util.Properties;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.Future;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.apache.kafka.clients.consumer.ConsumerConfig;
035import org.apache.kafka.clients.consumer.KafkaConsumer;
036import org.apache.kafka.clients.producer.KafkaProducer;
037import org.apache.kafka.clients.producer.ProducerRecord;
038import org.apache.kafka.clients.producer.RecordMetadata;
039import org.apache.kafka.common.TopicPartition;
040import org.apache.kafka.common.utils.Bytes;
041import org.nuxeo.lib.stream.StreamRuntimeException;
042import org.nuxeo.lib.stream.codec.Codec;
043import org.nuxeo.lib.stream.codec.SerializableCodec;
044import org.nuxeo.lib.stream.log.LogOffset;
045import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
046import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
047
048/**
049 * Apache Kafka implementation of Log.
050 *
051 * @since 9.3
052 */
053public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
054    private static final Log log = LogFactory.getLog(KafkaLogAppender.class);
055
056    protected final String topic;
057
058    protected final Properties consumerProps;
059
060    protected final Properties producerProps;
061
062    protected final int size;
063
064    // keep track of created tailers to make sure they are closed
065    protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
066
067    protected final String name;
068
069    protected final KafkaNamespace ns;
070
071    protected final Codec<M> codec;
072
073    protected final Codec<M> encodingCodec;
074
075    protected KafkaProducer<String, Bytes> producer;
076
077    protected boolean closed;
078
079    private KafkaLogAppender(Codec<M> codec, KafkaNamespace ns, String name, Properties producerProperties,
080            Properties consumerProperties) {
081        Objects.requireNonNull(codec);
082        this.codec = codec;
083        if (NO_CODEC.equals(codec)) {
084            this.encodingCodec = new SerializableCodec<>();
085        } else {
086            this.encodingCodec = codec;
087        }
088        this.ns = ns;
089        this.topic = ns.getTopicName(name);
090        this.name = name;
091        this.producerProps = producerProperties;
092        this.consumerProps = consumerProperties;
093        this.producer = new KafkaProducer<>(this.producerProps);
094        this.size = producer.partitionsFor(topic).size();
095        if (log.isDebugEnabled()) {
096            log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size));
097        }
098    }
099
100    public static <M extends Externalizable> KafkaLogAppender<M> open(Codec<M> codec, KafkaNamespace ns, String name,
101            Properties producerProperties, Properties consumerProperties) {
102        return new KafkaLogAppender<>(codec, ns, name, producerProperties, consumerProperties);
103    }
104
105    @Override
106    public String name() {
107        return name;
108    }
109
110    public String getTopic() {
111        return topic;
112    }
113
114    @Override
115    public int size() {
116        return size;
117    }
118
119    @Override
120    public LogOffset append(String key, M message) {
121        Objects.requireNonNull(key);
122        int partition = (key.hashCode() & 0x7fffffff) % size;
123        return append(partition, key, message);
124    }
125
126    @Override
127    public LogOffset append(int partition, M message) {
128        String key = String.valueOf(partition);
129        return append(partition, key, message);
130    }
131
132    public LogOffset append(int partition, String key, M message) {
133        Bytes value = Bytes.wrap(encodingCodec.encode(message));
134        ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value);
135        Future<RecordMetadata> future = producer.send(record);
136        RecordMetadata result;
137        try {
138            result = future.get();
139        } catch (InterruptedException e) {
140            Thread.currentThread().interrupt();
141            throw new StreamRuntimeException("Unable to send record: " + record, e);
142        } catch (ExecutionException e) {
143            throw new StreamRuntimeException("Unable to send record: " + record, e);
144        }
145        LogOffset ret = new LogOffsetImpl(name, partition, result.offset());
146        if (log.isDebugEnabled()) {
147            int len = record.value().get().length;
148            log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(),
149                    len, key, message));
150        }
151        return ret;
152    }
153
154    @Override
155    public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException {
156        boolean ret = false;
157        if (!name.equals(offset.partition().name())) {
158            throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset);
159        }
160        TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition());
161        try {
162            ret = isProcessed(group, topicPartition, offset.offset());
163            if (ret) {
164                return true;
165            }
166            long timeoutMs = timeout.toMillis();
167            long deadline = System.currentTimeMillis() + timeoutMs;
168            long delay = Math.min(100, timeoutMs);
169            while (!ret && System.currentTimeMillis() < deadline) {
170                Thread.sleep(delay);
171                ret = isProcessed(group, topicPartition, offset.offset());
172            }
173            return ret;
174        } finally {
175            if (log.isDebugEnabled()) {
176                log.debug("waitFor " + offset + "/" + group + " returns: " + ret);
177            }
178        }
179    }
180
181    @Override
182    public boolean closed() {
183        return closed;
184    }
185
186    @Override
187    public String toString() {
188        return "KafkaLogAppender{" + "name='" + name + '\'' + ", size=" + size + ", ns=" + ns + ", closed=" + closed
189                + ", codec=" + codec + '}';
190    }
191
192    @Override
193    public Codec<M> getCodec() {
194        return codec;
195    }
196
197    protected boolean isProcessed(String group, TopicPartition topicPartition, long offset) {
198        // TODO: find a better way, this is expensive to create a consumer each time
199        // but this is needed, an open consumer is not properly updated
200        Properties props = (Properties) consumerProps.clone();
201        props.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group));
202        try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) {
203            consumer.assign(Collections.singletonList(topicPartition));
204            long last = consumer.position(topicPartition);
205            boolean ret = last > 0 && last > offset;
206            if (log.isDebugEnabled()) {
207                log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group
208                        + ":+" + offset + "? " + ret + ", current position: " + last);
209            }
210            return ret;
211        }
212    }
213
214    @Override
215    public void close() {
216        log.debug("Closing appender: " + name);
217        tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
218            try {
219                tailer.close();
220            } catch (Exception e) {
221                log.error("Failed to close tailer: " + tailer);
222            }
223        });
224        tailers.clear();
225        if (producer != null) {
226            producer.close();
227            producer = null;
228        }
229        closed = true;
230    }
231}