001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import static java.util.stream.Collectors.toMap;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023
024import java.io.Externalizable;
025import java.time.Duration;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.ConcurrentModificationException;
029import java.util.HashMap;
030import java.util.LinkedList;
031import java.util.Map;
032import java.util.Objects;
033import java.util.Properties;
034import java.util.Queue;
035import java.util.stream.Collectors;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.apache.kafka.clients.consumer.ConsumerConfig;
040import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
041import org.apache.kafka.clients.consumer.ConsumerRecord;
042import org.apache.kafka.clients.consumer.KafkaConsumer;
043import org.apache.kafka.clients.consumer.OffsetAndMetadata;
044import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
045import org.apache.kafka.common.TopicPartition;
046import org.apache.kafka.common.errors.InterruptException;
047import org.apache.kafka.common.errors.WakeupException;
048import org.apache.kafka.common.utils.Bytes;
049import org.nuxeo.lib.stream.codec.Codec;
050import org.nuxeo.lib.stream.codec.SerializableCodec;
051import org.nuxeo.lib.stream.log.LogOffset;
052import org.nuxeo.lib.stream.log.LogPartition;
053import org.nuxeo.lib.stream.log.LogRecord;
054import org.nuxeo.lib.stream.log.LogTailer;
055import org.nuxeo.lib.stream.log.RebalanceException;
056import org.nuxeo.lib.stream.log.RebalanceListener;
057import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
058
059/**
060 * @since 9.3
061 */
062public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener {
063    private static final Log log = LogFactory.getLog(KafkaLogTailer.class);
064
065    protected final String group;
066
067    protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>();
068
069    protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>();
070
071    protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>();
072
073    protected final KafkaNamespace ns;
074
075    protected final Codec<M> codec;
076
077    protected final Codec<M> decodeCodec;
078
079    protected KafkaConsumer<String, Bytes> consumer;
080
081    protected String id;
082
083    protected Collection<TopicPartition> topicPartitions;
084
085    protected Collection<LogPartition> partitions;
086
087    // keep track of all tailers on the same namespace index even from different mq
088    protected boolean closed;
089
090    protected Collection<String> names;
091
092    protected RebalanceListener listener;
093
094    protected boolean isRebalanced;
095
096    protected boolean consumerMoved;
097
098    protected KafkaLogTailer(Codec<M> codec, KafkaNamespace ns, String group, Properties consumerProps) {
099        this.codec = codec;
100        if (NO_CODEC.equals(codec)) {
101            this.decodeCodec = new SerializableCodec<>();
102        } else {
103            this.decodeCodec = codec;
104        }
105        Objects.requireNonNull(group);
106        this.ns = ns;
107        this.group = group;
108        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group));
109        this.consumer = new KafkaConsumer<>(consumerProps);
110    }
111
112    @SuppressWarnings("squid:S2095")
113    public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(Codec<M> codec, KafkaNamespace ns,
114            Collection<LogPartition> partitions, String group, Properties consumerProps) {
115        KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, ns, group, consumerProps);
116        ret.id = buildId(ret.group, partitions);
117        ret.partitions = partitions;
118        ret.topicPartitions = partitions.stream()
119                                        .map(partition -> new TopicPartition(ns.getTopicName(partition.name()),
120                                                partition.partition()))
121                                        .collect(Collectors.toList());
122        ret.consumer.assign(ret.topicPartitions);
123        log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, ns));
124        return ret;
125    }
126
127    @SuppressWarnings("squid:S2095")
128    public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(Codec<M> codec, KafkaNamespace ns,
129            Collection<String> names, String group, Properties consumerProps, RebalanceListener listener) {
130        KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, ns, group, consumerProps);
131        ret.id = buildSubscribeId(ret.group, names);
132        ret.names = names;
133        Collection<String> topics = names.stream().map(ns::getTopicName).collect(Collectors.toList());
134        ret.listener = listener;
135        ret.consumer.subscribe(topics, ret);
136        ret.partitions = Collections.emptyList();
137        ret.topicPartitions = Collections.emptyList();
138        log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, ns));
139        return ret;
140    }
141
142    protected static String buildId(String group, Collection<LogPartition> partitions) {
143        return group + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|"));
144    }
145
146    protected static String buildSubscribeId(String group, Collection<String> names) {
147        return group + ":" + names.stream().collect(Collectors.joining("|"));
148    }
149
150    @Override
151    public LogRecord<M> read(Duration timeout) throws InterruptedException {
152        if (closed) {
153            throw new IllegalStateException("The tailer has been closed.");
154        }
155        if (records.isEmpty()) {
156            int items = poll(timeout);
157            if (isRebalanced) {
158                isRebalanced = false;
159                log.debug("Rebalance happens during poll, raising exception");
160                throw new RebalanceException("Partitions has been rebalanced");
161            }
162            if (items == 0) {
163                if (log.isTraceEnabled()) {
164                    log.trace("No data " + id + " after " + timeout.toMillis() + " ms");
165                }
166                return null;
167            }
168        }
169        ConsumerRecord<String, Bytes> record = records.poll();
170        lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
171        M value = decodeCodec.decode(record.value().get());
172        LogPartition partition = LogPartition.of(ns.getLogName(record.topic()), record.partition());
173        LogOffset offset = new LogOffsetImpl(partition, record.offset());
174        consumerMoved = false;
175        if (log.isDebugEnabled()) {
176            log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value));
177        }
178        return new LogRecord<>(value, offset);
179    }
180
181    protected int poll(Duration timeout) throws InterruptedException {
182        records.clear();
183        try {
184            for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout.toMillis())) {
185                if (log.isDebugEnabled() && records.isEmpty()) {
186                    log.debug("Poll first record: " + ns.getLogName(record.topic()) + ":" + record.partition() + ":+"
187                            + record.offset());
188                }
189                records.add(record);
190            }
191        } catch (org.apache.kafka.common.errors.InterruptException e) {
192            // the thread is already interrupted
193            throw new InterruptedException(e.getMessage());
194        } catch (WakeupException e) {
195            log.debug("Receiving wakeup from another thread to close the tailer");
196            close();
197            throw new IllegalStateException("poll interrupted because tailer has been closed");
198        }
199        if (log.isDebugEnabled()) {
200            String msg = "Polling " + id + " returns " + records.size() + " records";
201            if (records.isEmpty()) {
202                log.trace(msg);
203            } else {
204                log.debug(msg);
205            }
206        }
207        return records.size();
208    }
209
210    @Override
211    public void toEnd() {
212        log.debug("toEnd: " + id);
213        lastOffsets.clear();
214        records.clear();
215        consumer.seekToEnd(Collections.emptyList());
216        consumerMoved = true;
217    }
218
219    @Override
220    public void toStart() {
221        log.debug("toStart: " + id);
222        lastOffsets.clear();
223        records.clear();
224        consumer.seekToBeginning(Collections.emptyList());
225        consumerMoved = true;
226    }
227
228    @Override
229    public void toLastCommitted() {
230        if (log.isDebugEnabled()) {
231            log.debug("toLastCommitted tailer: " + id);
232        }
233        String msg = consumer.assignment()
234                             .stream()
235                             .map(tp -> String.format("%s-%02d:+%d", ns.getLogName(tp.topic()), tp.partition(),
236                                     toLastCommitted(tp)))
237                             .collect(Collectors.joining("|"));
238        if (msg.length() > 0 && log.isInfoEnabled()) {
239            log.info("toLastCommitted offsets: " + group + ":" + msg);
240        }
241        lastOffsets.clear();
242        records.clear();
243        consumerMoved = false;
244    }
245
246    protected long toLastCommitted(TopicPartition topicPartition) {
247        Long offset = lastCommittedOffsets.get(topicPartition);
248        if (offset == null) {
249            OffsetAndMetadata offsetMeta = consumer.committed(topicPartition);
250            if (offsetMeta != null) {
251                offset = offsetMeta.offset();
252            }
253        }
254        if (offset != null) {
255            consumer.seek(topicPartition, offset);
256        } else {
257            consumer.seekToBeginning(Collections.singletonList(topicPartition));
258            offset = consumer.position(topicPartition);
259        }
260        lastCommittedOffsets.put(topicPartition, offset);
261        if (log.isDebugEnabled()) {
262            log.debug(String.format(" toLastCommitted: %s-%02d:+%d", ns.getLogName(topicPartition.topic()),
263                    topicPartition.partition(), offset));
264        }
265        return offset;
266    }
267
268    @Override
269    public void seek(LogOffset offset) {
270        log.debug("Seek to: " + offset.offset() + " from tailer: " + id);
271        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(offset.partition().name()),
272                offset.partition().partition());
273        consumer.seek(topicPartition, offset.offset());
274        lastOffsets.remove(topicPartition);
275        int partition = topicPartition.partition();
276        records.removeIf(rec -> rec.partition() == partition);
277        consumerMoved = true;
278    }
279
280    @Override
281    public void reset() {
282        // we just commit the first offset
283        log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id);
284        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
285        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
286        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
287        consumer.commitSync(offsetToCommit);
288        lastCommittedOffsets.clear();
289        toLastCommitted();
290    }
291
292    @Override
293    public void reset(LogPartition partition) {
294        log.info("Reset committed offset for partition: " + partition + " tailer: " + id);
295        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition());
296        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition));
297        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
298        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
299        consumer.commitSync(offsetToCommit);
300        lastCommittedOffsets.remove(topicPartition);
301        seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition)));
302    }
303
304    @Override
305    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
306        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition());
307        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(
308                Collections.singletonMap(topicPartition, timestamp));
309        if (offsetsForTimes.size() == 1) {
310            OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition);
311            if (offsetAndTimestamp != null) {
312                return new LogOffsetImpl(partition, offsetAndTimestamp.offset());
313            }
314        }
315        return null;
316    }
317
318    @Override
319    public void commit() {
320        if (consumerMoved) {
321            forceCommit();
322            return;
323        }
324        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
325        lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1)));
326        lastOffsets.clear();
327        if (offsetToCommit.isEmpty()) {
328            return;
329        }
330        consumer.commitSync(offsetToCommit);
331        offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
332        if (log.isDebugEnabled()) {
333            String msg = offsetToCommit.entrySet()
334                                       .stream()
335                                       .map(entry -> String.format("%s-%02d:+%d", ns.getLogName(entry.getKey().topic()),
336                                               entry.getKey().partition(), entry.getValue().offset()))
337                                       .collect(Collectors.joining("|"));
338            log.debug("Committed offsets  " + group + ":" + msg);
339        }
340    }
341
342    /**
343     * Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsets
344     */
345    protected void forceCommit() {
346        log.info("Force commit after a move");
347
348        Map<TopicPartition, OffsetAndMetadata> offsets = topicPartitions.stream().collect(
349                toMap(tp -> tp, tp -> new OffsetAndMetadata(consumer.position(tp))));
350        consumer.commitSync(offsets);
351        offsets.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
352        consumerMoved = false;
353        lastOffsets.clear();
354    }
355
356    @Override
357    public LogOffset commit(LogPartition partition) {
358        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition());
359        Long offset = lastOffsets.get(topicPartition);
360        if (offset == null) {
361            if (log.isDebugEnabled()) {
362                log.debug("unchanged partition, nothing to commit: " + partition);
363            }
364            return null;
365        }
366        offset += 1;
367        consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset)));
368        LogOffset ret = new LogOffsetImpl(partition, offset);
369        if (log.isInfoEnabled()) {
370            log.info("Committed: " + offset + "/" + group);
371        }
372        return ret;
373    }
374
375    @Override
376    public Collection<LogPartition> assignments() {
377        return partitions;
378    }
379
380    @Override
381    public String group() {
382        return group;
383    }
384
385    @Override
386    public boolean closed() {
387        return closed;
388    }
389
390    @Override
391    public Codec<M> getCodec() {
392        return codec;
393    }
394
395    @SuppressWarnings("squid:S1181")
396    @Override
397    public void close() {
398        if (consumer != null) {
399            log.info("Closing tailer: " + id);
400            try {
401                // calling wakeup enable to terminate consumer blocking on poll call
402                consumer.close();
403            } catch (ConcurrentModificationException e) {
404                // closing from another thread raise this exception, try to wakeup the owner
405                log.info("Closing tailer from another thread, send wakeup");
406                consumer.wakeup();
407                return;
408            } catch (InterruptException | IllegalStateException e) {
409                // this happens if the consumer has already been closed or if it is closed from another
410                // thread.
411                log.warn("Discard error while closing consumer: ", e);
412            } catch (Throwable t) {
413                log.error("interrupted", t);
414            }
415            consumer = null;
416        }
417        closed = true;
418    }
419
420    @Override
421    public String toString() {
422        return "KafkaLogTailer{" + "ns='" + ns + '\'' + ", id=" + id + ", closed=" + closed + ", codec=" + codec + '}';
423    }
424
425    @Override
426    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
427        Collection<LogPartition> revoked = partitions.stream()
428                                                     .map(tp -> LogPartition.of(ns.getLogName(tp.topic()),
429                                                             tp.partition()))
430                                                     .collect(Collectors.toList());
431        log.info(String.format("Rebalance revoked: %s", revoked));
432        id += "-revoked";
433        if (listener != null) {
434            listener.onPartitionsRevoked(revoked);
435        }
436    }
437
438    @Override
439    public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) {
440        partitions = newPartitions.stream()
441                                  .map(tp -> LogPartition.of(ns.getLogName(tp.topic()), tp.partition()))
442                                  .collect(Collectors.toList());
443        topicPartitions = newPartitions;
444        id = buildId(group, partitions);
445        lastCommittedOffsets.clear();
446        lastOffsets.clear();
447        records.clear();
448        isRebalanced = true;
449        log.info(String.format("Rebalance assigned: %s", partitions));
450        if (listener != null) {
451            listener.onPartitionsAssigned(partitions);
452        }
453    }
454
455}