001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.time.Duration;
025import java.util.Collections;
026import java.util.Objects;
027import java.util.Properties;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.Future;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.kafka.clients.consumer.ConsumerConfig;
036import org.apache.kafka.clients.consumer.KafkaConsumer;
037import org.apache.kafka.clients.producer.KafkaProducer;
038import org.apache.kafka.clients.producer.ProducerConfig;
039import org.apache.kafka.clients.producer.ProducerRecord;
040import org.apache.kafka.clients.producer.RecordMetadata;
041import org.apache.kafka.common.TopicPartition;
042import org.apache.kafka.common.utils.Bytes;
043import org.nuxeo.lib.stream.StreamRuntimeException;
044import org.nuxeo.lib.stream.codec.Codec;
045import org.nuxeo.lib.stream.codec.SerializableCodec;
046import org.nuxeo.lib.stream.log.LogOffset;
047import org.nuxeo.lib.stream.log.Name;
048import org.nuxeo.lib.stream.log.NameResolver;
049import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
050import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
051
052/**
053 * Apache Kafka implementation of Log.
054 *
055 * @since 9.3
056 */
057public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
058    private static final Log log = LogFactory.getLog(KafkaLogAppender.class);
059
060    protected final String topic;
061
062    protected final Properties consumerProps;
063
064    protected final Properties producerProps;
065
066    protected final int size;
067
068    // keep track of created tailers to make sure they are closed
069    protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
070
071    protected final Name name;
072
073    protected final Codec<M> codec;
074
075    protected final Codec<M> encodingCodec;
076
077    protected final NameResolver resolver;
078
079    protected KafkaProducer<String, Bytes> producer;
080
081    protected boolean closed;
082
083    protected static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
084
085    private KafkaLogAppender(Codec<M> codec, NameResolver resolver, Name name, Properties producerProperties,
086            Properties consumerProperties) {
087        Objects.requireNonNull(codec);
088        this.codec = codec;
089        this.resolver = resolver;
090        if (NO_CODEC.equals(codec)) {
091            this.encodingCodec = new SerializableCodec<>();
092        } else {
093            this.encodingCodec = codec;
094        }
095        this.name = name;
096        this.topic = resolver.getId(name);
097        this.producerProps = producerProperties;
098        this.consumerProps = consumerProperties;
099        producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG,
100                resolver.getId(this.name) + "-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement());
101        this.producer = new KafkaProducer<>(this.producerProps);
102        this.size = producer.partitionsFor(topic).size();
103        if (log.isDebugEnabled()) {
104            log.debug(String.format("Created appender: %s on topic: %s with %d partitions", this.name, topic, size));
105        }
106    }
107
108    public static <M extends Externalizable> KafkaLogAppender<M> open(Codec<M> codec, NameResolver resolver, Name name,
109            Properties producerProperties, Properties consumerProperties) {
110        return new KafkaLogAppender<>(codec, resolver, name, producerProperties, consumerProperties);
111    }
112
113    @Override
114    public Name name() {
115        return name;
116    }
117
118    public String getTopic() {
119        return topic;
120    }
121
122    @Override
123    public int size() {
124        return size;
125    }
126
127    @Override
128    public LogOffset append(String key, M message) {
129        Objects.requireNonNull(key);
130        int partition = (key.hashCode() & 0x7fffffff) % size;
131        return append(partition, key, message);
132    }
133
134    @Override
135    public LogOffset append(int partition, M message) {
136        String key = String.valueOf(partition);
137        return append(partition, key, message);
138    }
139
140    public LogOffset append(int partition, String key, M message) {
141        Bytes value = Bytes.wrap(encodingCodec.encode(message));
142        ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value);
143        Future<RecordMetadata> future = producer.send(record);
144        RecordMetadata result;
145        try {
146            result = future.get();
147        } catch (InterruptedException e) {
148            Thread.currentThread().interrupt();
149            throw new StreamRuntimeException("Unable to send record: " + record, e);
150        } catch (ExecutionException e) {
151            throw new StreamRuntimeException("Unable to send record: " + record, e);
152        }
153        LogOffset ret = new LogOffsetImpl(name, partition, result.offset());
154        if (log.isDebugEnabled()) {
155            int len = record.value().get().length;
156            log.debug(String.format("Append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(),
157                    len, key, message));
158        }
159        return ret;
160    }
161
162    @Override
163    public boolean waitFor(LogOffset offset, Name group, Duration timeout) throws InterruptedException {
164        boolean ret = false;
165        if (!name.equals(offset.partition().name())) {
166            throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset);
167        }
168        TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition());
169        try {
170            ret = isProcessed(group, topicPartition, offset.offset());
171            if (ret) {
172                return true;
173            }
174            long timeoutMs = timeout.toMillis();
175            long deadline = System.currentTimeMillis() + timeoutMs;
176            long delay = Math.min(100, timeoutMs);
177            while (!ret && System.currentTimeMillis() < deadline) {
178                Thread.sleep(delay);
179                ret = isProcessed(group, topicPartition, offset.offset());
180            }
181            return ret;
182        } finally {
183            if (log.isDebugEnabled()) {
184                log.debug("waitFor " + offset + "/" + group + " returns: " + ret);
185            }
186        }
187    }
188
189    @Override
190    public boolean closed() {
191        return closed;
192    }
193
194    @Override
195    public String toString() {
196        return "KafkaLogAppender{" + "name='" + name + '\'' + ", size=" + size + ", closed=" + closed
197                + ", codec=" + codec + '}';
198    }
199
200    @Override
201    public Codec<M> getCodec() {
202        return codec;
203    }
204
205    protected boolean isProcessed(Name group, TopicPartition topicPartition, long offset) {
206        // TODO: find a better way, this is expensive to create a consumer each time
207        // but this is needed, an open consumer is not properly updated
208        Properties props = (Properties) consumerProps.clone();
209        props.put(ConsumerConfig.GROUP_ID_CONFIG, resolver.getId(group));
210        try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) {
211            consumer.assign(Collections.singletonList(topicPartition));
212            long last = consumer.position(topicPartition);
213            boolean ret = last > 0 && last > offset;
214            if (log.isDebugEnabled()) {
215                log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group
216                        + ":+" + offset + "? " + ret + ", current position: " + last);
217            }
218            return ret;
219        }
220    }
221
222    @Override
223    public void close() {
224        log.debug("Closing appender: " + name);
225        tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
226            try {
227                tailer.close();
228            } catch (Exception e) {
229                log.error("Failed to close tailer: " + tailer);
230            }
231        });
232        tailers.clear();
233        if (producer != null) {
234            producer.close();
235            producer = null;
236        }
237        closed = true;
238    }
239}