001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.time.Duration;
025import java.util.Collections;
026import java.util.Objects;
027import java.util.Properties;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.Future;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.kafka.clients.consumer.ConsumerConfig;
036import org.apache.kafka.clients.consumer.KafkaConsumer;
037import org.apache.kafka.clients.producer.KafkaProducer;
038import org.apache.kafka.clients.producer.ProducerConfig;
039import org.apache.kafka.clients.producer.ProducerRecord;
040import org.apache.kafka.clients.producer.RecordMetadata;
041import org.apache.kafka.common.TopicPartition;
042import org.apache.kafka.common.utils.Bytes;
043import org.nuxeo.lib.stream.StreamRuntimeException;
044import org.nuxeo.lib.stream.codec.Codec;
045import org.nuxeo.lib.stream.codec.SerializableCodec;
046import org.nuxeo.lib.stream.log.LogOffset;
047import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
048import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
049
050/**
051 * Apache Kafka implementation of Log.
052 *
053 * @since 9.3
054 */
055public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
056    private static final Log log = LogFactory.getLog(KafkaLogAppender.class);
057
058    protected final String topic;
059
060    protected final Properties consumerProps;
061
062    protected final Properties producerProps;
063
064    protected final int size;
065
066    // keep track of created tailers to make sure they are closed
067    protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
068
069    protected final String name;
070
071    protected final KafkaNamespace ns;
072
073    protected final Codec<M> codec;
074
075    protected final Codec<M> encodingCodec;
076
077    protected KafkaProducer<String, Bytes> producer;
078
079    protected boolean closed;
080
081    protected static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
082
083    private KafkaLogAppender(Codec<M> codec, KafkaNamespace ns, String name, Properties producerProperties,
084            Properties consumerProperties) {
085        Objects.requireNonNull(codec);
086        this.codec = codec;
087        if (NO_CODEC.equals(codec)) {
088            this.encodingCodec = new SerializableCodec<>();
089        } else {
090            this.encodingCodec = codec;
091        }
092        this.ns = ns;
093        this.topic = ns.getTopicName(name);
094        this.name = name;
095        this.producerProps = producerProperties;
096        this.consumerProps = consumerProperties;
097        producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG,
098                name + "-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement());
099        this.producer = new KafkaProducer<>(this.producerProps);
100        this.size = producer.partitionsFor(topic).size();
101        if (log.isDebugEnabled()) {
102            log.debug(String.format("Created appender: %s on topic: %s with %d partitions", name, topic, size));
103        }
104    }
105
106    public static <M extends Externalizable> KafkaLogAppender<M> open(Codec<M> codec, KafkaNamespace ns, String name,
107            Properties producerProperties, Properties consumerProperties) {
108        return new KafkaLogAppender<>(codec, ns, name, producerProperties, consumerProperties);
109    }
110
111    @Override
112    public String name() {
113        return name;
114    }
115
116    public String getTopic() {
117        return topic;
118    }
119
120    @Override
121    public int size() {
122        return size;
123    }
124
125    @Override
126    public LogOffset append(String key, M message) {
127        Objects.requireNonNull(key);
128        int partition = (key.hashCode() & 0x7fffffff) % size;
129        return append(partition, key, message);
130    }
131
132    @Override
133    public LogOffset append(int partition, M message) {
134        String key = String.valueOf(partition);
135        return append(partition, key, message);
136    }
137
138    public LogOffset append(int partition, String key, M message) {
139        Bytes value = Bytes.wrap(encodingCodec.encode(message));
140        ProducerRecord<String, Bytes> record = new ProducerRecord<>(topic, partition, key, value);
141        Future<RecordMetadata> future = producer.send(record);
142        RecordMetadata result;
143        try {
144            result = future.get();
145        } catch (InterruptedException e) {
146            Thread.currentThread().interrupt();
147            throw new StreamRuntimeException("Unable to send record: " + record, e);
148        } catch (ExecutionException e) {
149            throw new StreamRuntimeException("Unable to send record: " + record, e);
150        }
151        LogOffset ret = new LogOffsetImpl(name, partition, result.offset());
152        if (log.isDebugEnabled()) {
153            int len = record.value().get().length;
154            log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", name, partition, ret.offset(),
155                    len, key, message));
156        }
157        return ret;
158    }
159
160    @Override
161    public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException {
162        boolean ret = false;
163        if (!name.equals(offset.partition().name())) {
164            throw new IllegalArgumentException(name + " can not wait for an offset with a different Log: " + offset);
165        }
166        TopicPartition topicPartition = new TopicPartition(topic, offset.partition().partition());
167        try {
168            ret = isProcessed(group, topicPartition, offset.offset());
169            if (ret) {
170                return true;
171            }
172            long timeoutMs = timeout.toMillis();
173            long deadline = System.currentTimeMillis() + timeoutMs;
174            long delay = Math.min(100, timeoutMs);
175            while (!ret && System.currentTimeMillis() < deadline) {
176                Thread.sleep(delay);
177                ret = isProcessed(group, topicPartition, offset.offset());
178            }
179            return ret;
180        } finally {
181            if (log.isDebugEnabled()) {
182                log.debug("waitFor " + offset + "/" + group + " returns: " + ret);
183            }
184        }
185    }
186
187    @Override
188    public boolean closed() {
189        return closed;
190    }
191
192    @Override
193    public String toString() {
194        return "KafkaLogAppender{" + "name='" + name + '\'' + ", size=" + size + ", ns=" + ns + ", closed=" + closed
195                + ", codec=" + codec + '}';
196    }
197
198    @Override
199    public Codec<M> getCodec() {
200        return codec;
201    }
202
203    protected boolean isProcessed(String group, TopicPartition topicPartition, long offset) {
204        // TODO: find a better way, this is expensive to create a consumer each time
205        // but this is needed, an open consumer is not properly updated
206        Properties props = (Properties) consumerProps.clone();
207        props.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group));
208        try (KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(props)) {
209            consumer.assign(Collections.singletonList(topicPartition));
210            long last = consumer.position(topicPartition);
211            boolean ret = last > 0 && last > offset;
212            if (log.isDebugEnabled()) {
213                log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + group
214                        + ":+" + offset + "? " + ret + ", current position: " + last);
215            }
216            return ret;
217        }
218    }
219
220    @Override
221    public void close() {
222        log.debug("Closing appender: " + name);
223        tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
224            try {
225                tailer.close();
226            } catch (Exception e) {
227                log.error("Failed to close tailer: " + tailer);
228            }
229        });
230        tailers.clear();
231        if (producer != null) {
232            producer.close();
233            producer = null;
234        }
235        closed = true;
236    }
237}