001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import static java.util.stream.Collectors.toMap;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023
024import java.io.Externalizable;
025import java.time.Duration;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.ConcurrentModificationException;
029import java.util.HashMap;
030import java.util.LinkedList;
031import java.util.Map;
032import java.util.Objects;
033import java.util.Properties;
034import java.util.Queue;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.stream.Collectors;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.apache.kafka.clients.consumer.CommitFailedException;
041import org.apache.kafka.clients.consumer.ConsumerConfig;
042import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
043import org.apache.kafka.clients.consumer.ConsumerRecord;
044import org.apache.kafka.clients.consumer.KafkaConsumer;
045import org.apache.kafka.clients.consumer.OffsetAndMetadata;
046import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
047import org.apache.kafka.common.TopicPartition;
048import org.apache.kafka.common.errors.InterruptException;
049import org.apache.kafka.common.errors.RebalanceInProgressException;
050import org.apache.kafka.common.errors.WakeupException;
051import org.apache.kafka.common.utils.Bytes;
052import org.nuxeo.lib.stream.codec.Codec;
053import org.nuxeo.lib.stream.codec.SerializableCodec;
054import org.nuxeo.lib.stream.log.LogOffset;
055import org.nuxeo.lib.stream.log.LogPartition;
056import org.nuxeo.lib.stream.log.LogRecord;
057import org.nuxeo.lib.stream.log.LogTailer;
058import org.nuxeo.lib.stream.log.Name;
059import org.nuxeo.lib.stream.log.NameResolver;
060import org.nuxeo.lib.stream.log.RebalanceException;
061import org.nuxeo.lib.stream.log.RebalanceListener;
062import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
063
064/**
065 * @since 9.3
066 */
067public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener {
068    private static final Log log = LogFactory.getLog(KafkaLogTailer.class);
069
070    protected final Name group;
071
072    protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>();
073
074    protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>();
075
076    protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>();
077
078    protected final Codec<M> codec;
079
080    protected final Codec<M> decodeCodec;
081
082    protected final NameResolver resolver;
083
084    protected KafkaConsumer<String, Bytes> consumer;
085
086    protected String id;
087
088    protected Collection<TopicPartition> topicPartitions;
089
090    protected Collection<LogPartition> partitions;
091
092    // keep track of all tailers on the same namespace index even from different mq
093    protected boolean closed;
094
095    protected Collection<Name> names;
096
097    protected RebalanceListener listener;
098
099    protected boolean isRebalanced;
100
101    protected boolean isRevoked;
102
103    protected boolean isLost;
104
105    protected boolean consumerMoved;
106
107    protected static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
108
109    protected long lastPollTimestamp;
110
111    protected int lastPollSize;
112
113    protected KafkaLogTailer(Codec<M> codec, NameResolver resolver, Name group, Properties consumerProps) {
114        this.codec = codec;
115        this.resolver = resolver;
116        if (NO_CODEC.equals(codec)) {
117            this.decodeCodec = new SerializableCodec<>();
118        } else {
119            this.decodeCodec = codec;
120        }
121        Objects.requireNonNull(group);
122        this.group = group;
123        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, resolver.getId(group));
124        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG,
125                resolver.getId(group) + "-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement());
126        this.consumer = new KafkaConsumer<>(consumerProps);
127    }
128
129    @SuppressWarnings("squid:S2095")
130    public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(Codec<M> codec,
131            NameResolver resolver, Collection<LogPartition> partitions, Name group, Properties consumerProps) {
132        KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, resolver, group, consumerProps);
133        ret.id = buildId(ret.group, partitions);
134        ret.partitions = partitions;
135        ret.topicPartitions = partitions.stream()
136                                        .map(partition -> new TopicPartition(resolver.getId(partition.name()),
137                                                partition.partition()))
138                                        .collect(Collectors.toList());
139        ret.consumer.assign(ret.topicPartitions);
140        log.debug(String.format("Created tailer with assignments: %s", ret.id));
141        return ret;
142    }
143
144    @SuppressWarnings("squid:S2095")
145    public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(Codec<M> codec,
146            NameResolver resolver, Collection<Name> names, Name group, Properties consumerProps,
147            RebalanceListener listener) {
148        KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, resolver, group, consumerProps);
149        ret.id = buildSubscribeId(ret.group, names);
150        ret.names = names;
151        Collection<String> topics = names.stream().map(resolver::getId).collect(Collectors.toList());
152        ret.listener = listener;
153        ret.consumer.subscribe(topics, ret);
154        ret.partitions = Collections.emptyList();
155        ret.topicPartitions = Collections.emptyList();
156        log.debug(String.format("Created tailer with subscription: %s", ret.id));
157        return ret;
158    }
159
160    protected static String buildId(Name group, Collection<LogPartition> partitions) {
161        return group.getId() + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|"));
162    }
163
164    protected static String buildSubscribeId(Name group, Collection<Name> names) {
165        return group.getId() + ":"
166                + names.stream().map(Name::getId).collect(Collectors.joining("|"));
167    }
168
169    @Override
170    public LogRecord<M> read(Duration timeout) throws InterruptedException {
171        if (closed) {
172            throw new IllegalStateException("The tailer has been closed.");
173        }
174        if (records.isEmpty()) {
175            int items = poll(timeout);
176            if (isRebalanced || isRevoked || isLost) {
177                if (isRebalanced) {
178                    log.debug("Rebalance happens during poll, raising exception");
179                    isRebalanced = false;
180                } else {
181                    log.warn("Incomplete rebalance during poll, raising exception, revoked: " + isRevoked + ", lost: "
182                            + isLost);
183                    isRevoked = isLost = false;
184                }
185                throw new RebalanceException("Partitions has been rebalanced");
186            }
187            if (items == 0) {
188                if (log.isTraceEnabled()) {
189                    log.trace("No data " + id + " after " + timeout.toMillis() + " ms");
190                }
191                return null;
192            }
193        }
194        ConsumerRecord<String, Bytes> record = records.poll();
195        lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
196        M value = decodeCodec.decode(record.value().get());
197        LogPartition partition = LogPartition.of(resolver.getName(record.topic()), record.partition());
198        LogOffset offset = new LogOffsetImpl(partition, record.offset());
199        consumerMoved = false;
200        if (log.isDebugEnabled()) {
201            log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value));
202        }
203        return new LogRecord<>(value, offset);
204    }
205
206    protected int poll(Duration timeout) throws InterruptedException {
207        records.clear();
208        try {
209            lastPollTimestamp = System.currentTimeMillis();
210            for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout)) {
211                if (log.isDebugEnabled() && records.isEmpty()) {
212                    log.debug("Poll first record: " + resolver.getName(record.topic()).getUrn() + ":"
213                            + record.partition() + ":+"
214                            + record.offset());
215                }
216                records.add(record);
217            }
218        } catch (org.apache.kafka.common.errors.InterruptException e) {
219            // the thread is already interrupted
220            throw new InterruptedException(e.getMessage());
221        } catch (WakeupException e) {
222            log.debug("Receiving wakeup from another thread to close the tailer");
223            close();
224            throw new IllegalStateException("poll interrupted because tailer has been closed");
225        }
226        if (log.isDebugEnabled()) {
227            String msg = "Polling " + id + " returns " + records.size() + " records";
228            if (records.isEmpty()) {
229                log.trace(msg);
230            } else {
231                log.debug(msg);
232            }
233        }
234        lastPollSize = records.size();
235        return records.size();
236    }
237
238    @Override
239    public void toEnd() {
240        log.debug("toEnd: " + id);
241        lastOffsets.clear();
242        records.clear();
243        consumer.seekToEnd(Collections.emptyList());
244        consumerMoved = true;
245    }
246
247    @Override
248    public void toStart() {
249        log.debug("toStart: " + id);
250        lastOffsets.clear();
251        records.clear();
252        consumer.seekToBeginning(Collections.emptyList());
253        consumerMoved = true;
254    }
255
256    @Override
257    public void toLastCommitted() {
258        if (log.isDebugEnabled()) {
259            log.debug("toLastCommitted tailer: " + id);
260        }
261        String msg = consumer.assignment()
262                             .stream()
263                             .map(tp -> String.format("%s-%02d:+%d", resolver.getName(tp.topic()).getUrn(),
264                                     tp.partition(),
265                                     toLastCommitted(tp)))
266                             .collect(Collectors.joining("|"));
267        if (msg.length() > 0 && log.isInfoEnabled()) {
268            log.info("toLastCommitted offsets: " + group + ":" + msg);
269        }
270        lastOffsets.clear();
271        records.clear();
272        consumerMoved = false;
273    }
274
275    protected long toLastCommitted(TopicPartition topicPartition) {
276        Long offset = lastCommittedOffsets.get(topicPartition);
277        if (offset == null) {
278            Map<TopicPartition, OffsetAndMetadata> offsetMeta = consumer.committed(
279                    Collections.singleton(topicPartition));
280            if (offsetMeta != null && offsetMeta.get(topicPartition) != null) {
281                offset = offsetMeta.get(topicPartition).offset();
282            }
283        }
284        if (offset != null) {
285            consumer.seek(topicPartition, offset);
286        } else {
287            consumer.seekToBeginning(Collections.singletonList(topicPartition));
288            offset = consumer.position(topicPartition);
289        }
290        lastCommittedOffsets.put(topicPartition, offset);
291        if (log.isDebugEnabled()) {
292            log.debug(String.format(" toLastCommitted: %s-%02d:+%d", resolver.getName(topicPartition.topic()).getUrn(),
293                    topicPartition.partition(), offset));
294        }
295        return offset;
296    }
297
298    @Override
299    public void seek(LogOffset offset) {
300        log.debug("Seek to: " + offset.offset() + " from tailer: " + id);
301        TopicPartition topicPartition = new TopicPartition(resolver.getId(offset.partition().name()),
302                offset.partition().partition());
303        consumer.seek(topicPartition, offset.offset());
304        lastOffsets.remove(topicPartition);
305        int partition = topicPartition.partition();
306        records.removeIf(rec -> rec.partition() == partition);
307        consumerMoved = true;
308    }
309
310    @Override
311    public void reset() {
312        // we just commit the first offset
313        log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id);
314        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
315        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
316        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
317        consumer.commitSync(offsetToCommit);
318        lastCommittedOffsets.clear();
319        toLastCommitted();
320    }
321
322    @Override
323    public void reset(LogPartition partition) {
324        log.info("Reset committed offset for partition: " + partition + " tailer: " + id);
325        TopicPartition topicPartition = new TopicPartition(resolver.getId(partition.name()), partition.partition());
326        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition));
327        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
328        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
329        consumer.commitSync(offsetToCommit);
330        lastCommittedOffsets.remove(topicPartition);
331        seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition)));
332    }
333
334    @Override
335    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
336        TopicPartition topicPartition = new TopicPartition(resolver.getId(partition.name()), partition.partition());
337        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(
338                Collections.singletonMap(topicPartition, timestamp));
339        if (offsetsForTimes.size() == 1) {
340            OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition);
341            if (offsetAndTimestamp != null) {
342                return new LogOffsetImpl(partition, offsetAndTimestamp.offset());
343            }
344        }
345        return null;
346    }
347
348    @Override
349    public void commit() {
350        if (consumerMoved) {
351            forceCommit();
352            return;
353        }
354        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
355        lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1)));
356        lastOffsets.clear();
357        if (offsetToCommit.isEmpty()) {
358            return;
359        }
360        try {
361            consumer.commitSync(offsetToCommit);
362        } catch (CommitFailedException | RebalanceInProgressException e) {
363            log.error("Fail to commit " + offsetToCommit + " assigned " + assignments() + " last committed: "
364                    + lastCommittedOffsets + " last size: " + lastPollSize + " elapsed: "
365                    + (System.currentTimeMillis() - lastPollTimestamp) + " ms, records polled: " + records.size());
366            throw e;
367        }
368        offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
369        if (log.isDebugEnabled()) {
370            String msg = offsetToCommit.entrySet()
371                                       .stream()
372                                       .map(entry -> String.format("%s-%02d:+%d",
373                                               resolver.getName(entry.getKey().topic()).getUrn(),
374                                               entry.getKey().partition(), entry.getValue().offset()))
375                                       .collect(Collectors.joining("|"));
376            log.debug("Committed offsets  " + group + ":" + msg);
377        }
378    }
379
380    /**
381     * Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsets
382     */
383    protected void forceCommit() {
384        log.info("Force commit after a move");
385
386        Map<TopicPartition, OffsetAndMetadata> offsets = topicPartitions.stream()
387                                                                        .collect(toMap(tp -> tp,
388                                                                                tp -> new OffsetAndMetadata(
389                                                                                        consumer.position(tp))));
390        consumer.commitSync(offsets);
391        offsets.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
392        consumerMoved = false;
393        lastOffsets.clear();
394    }
395
396    @Override
397    public LogOffset commit(LogPartition partition) {
398        TopicPartition topicPartition = new TopicPartition(resolver.getId(partition.name()), partition.partition());
399        Long offset = lastOffsets.get(topicPartition);
400        if (offset == null) {
401            if (log.isDebugEnabled()) {
402                log.debug("unchanged partition, nothing to commit: " + partition);
403            }
404            return null;
405        }
406        offset += 1;
407        consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset)));
408        LogOffset ret = new LogOffsetImpl(partition, offset);
409        if (log.isInfoEnabled()) {
410            log.info("Committed: " + offset + "/" + group);
411        }
412        return ret;
413    }
414
415    @Override
416    public Collection<LogPartition> assignments() {
417        return partitions;
418    }
419
420    @Override
421    public Name group() {
422        return group;
423    }
424
425    @Override
426    public boolean closed() {
427        return closed;
428    }
429
430    @Override
431    public Codec<M> getCodec() {
432        return codec;
433    }
434
435    @SuppressWarnings("squid:S1181")
436    @Override
437    public void close() {
438        if (consumer != null) {
439            log.debug("Closing tailer: " + id);
440            try {
441                // calling wakeup enable to terminate consumer blocking on poll call
442                consumer.close();
443            } catch (ConcurrentModificationException e) {
444                // closing from another thread raise this exception, try to wakeup the owner
445                log.info("Closing tailer from another thread, send wakeup");
446                consumer.wakeup();
447                return;
448            } catch (InterruptException | IllegalStateException e) {
449                // this happens if the consumer has already been closed or if it is closed from another
450                // thread.
451                log.info("Discard error while closing consumer: ", e);
452            } catch (Throwable t) {
453                log.error("interrupted", t);
454            }
455            consumer = null;
456        }
457        closed = true;
458    }
459
460    @Override
461    public String toString() {
462        return "KafkaLogTailer{" + "id=" + id + ", closed=" + closed + ", codec=" + codec + '}';
463    }
464
465    @Override
466    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
467        Collection<LogPartition> revoked = partitions.stream()
468                                                     .map(tp -> LogPartition.of(resolver.getName(tp.topic()),
469                                                             tp.partition()))
470                                                     .collect(Collectors.toList());
471        log.info(String.format("Rebalance revoked: %s", revoked));
472        cleanDuringRebalancing();
473        isRevoked = true;
474        id += "-revoked";
475        if (listener != null) {
476            listener.onPartitionsRevoked(revoked);
477        }
478    }
479
480    @Override
481    public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) {
482        partitions = newPartitions.stream()
483                                  .map(tp -> LogPartition.of(resolver.getName(tp.topic()), tp.partition()))
484                                  .collect(Collectors.toList());
485        topicPartitions = newPartitions;
486        id = buildId(group, partitions);
487        cleanDuringRebalancing();
488        isRebalanced = true;
489        log.info(String.format("Rebalance assigned: %s", partitions));
490        if (listener != null) {
491            listener.onPartitionsAssigned(partitions);
492        }
493    }
494
495    @Override
496    public void onPartitionsLost(Collection<TopicPartition> partitions) {
497        Collection<LogPartition> lost = partitions.stream()
498                .map(tp -> LogPartition.of(resolver.getName(tp.topic()),
499                        tp.partition()))
500                .collect(Collectors.toList());
501        log.warn(String.format("Rebalance Partition Lost: %s", lost));
502        id += "-lost";
503        cleanDuringRebalancing();
504        isLost = true;
505        if (listener != null) {
506            listener.onPartitionsLost(lost);
507        }
508    }
509
510    protected void cleanDuringRebalancing() {
511        lastCommittedOffsets.clear();
512        lastOffsets.clear();
513        records.clear();
514        isRebalanced = isRevoked = isLost = false;
515    }
516}