001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import static java.util.stream.Collectors.toMap;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023
024import java.io.Externalizable;
025import java.time.Duration;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.ConcurrentModificationException;
029import java.util.HashMap;
030import java.util.LinkedList;
031import java.util.Map;
032import java.util.Objects;
033import java.util.Properties;
034import java.util.Queue;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.stream.Collectors;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.apache.kafka.clients.consumer.ConsumerConfig;
041import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
042import org.apache.kafka.clients.consumer.ConsumerRecord;
043import org.apache.kafka.clients.consumer.KafkaConsumer;
044import org.apache.kafka.clients.consumer.OffsetAndMetadata;
045import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
046import org.apache.kafka.common.TopicPartition;
047import org.apache.kafka.common.errors.InterruptException;
048import org.apache.kafka.common.errors.WakeupException;
049import org.apache.kafka.common.utils.Bytes;
050import org.nuxeo.lib.stream.codec.Codec;
051import org.nuxeo.lib.stream.codec.SerializableCodec;
052import org.nuxeo.lib.stream.log.LogOffset;
053import org.nuxeo.lib.stream.log.LogPartition;
054import org.nuxeo.lib.stream.log.LogRecord;
055import org.nuxeo.lib.stream.log.LogTailer;
056import org.nuxeo.lib.stream.log.RebalanceException;
057import org.nuxeo.lib.stream.log.RebalanceListener;
058import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
059
060/**
061 * @since 9.3
062 */
063public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener {
064    private static final Log log = LogFactory.getLog(KafkaLogTailer.class);
065
066    protected final String group;
067
068    protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>();
069
070    protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>();
071
072    protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>();
073
074    protected final KafkaNamespace ns;
075
076    protected final Codec<M> codec;
077
078    protected final Codec<M> decodeCodec;
079
080    protected KafkaConsumer<String, Bytes> consumer;
081
082    protected String id;
083
084    protected Collection<TopicPartition> topicPartitions;
085
086    protected Collection<LogPartition> partitions;
087
088    // keep track of all tailers on the same namespace index even from different mq
089    protected boolean closed;
090
091    protected Collection<String> names;
092
093    protected RebalanceListener listener;
094
095    protected boolean isRebalanced;
096
097    protected boolean consumerMoved;
098
099    protected static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
100
101    protected KafkaLogTailer(Codec<M> codec, KafkaNamespace ns, String group, Properties consumerProps) {
102        this.codec = codec;
103        if (NO_CODEC.equals(codec)) {
104            this.decodeCodec = new SerializableCodec<>();
105        } else {
106            this.decodeCodec = codec;
107        }
108        Objects.requireNonNull(group);
109        this.ns = ns;
110        this.group = group;
111        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group));
112        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, group + "-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement());
113        this.consumer = new KafkaConsumer<>(consumerProps);
114    }
115
116    @SuppressWarnings("squid:S2095")
117    public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(Codec<M> codec, KafkaNamespace ns,
118            Collection<LogPartition> partitions, String group, Properties consumerProps) {
119        KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, ns, group, consumerProps);
120        ret.id = buildId(ret.group, partitions);
121        ret.partitions = partitions;
122        ret.topicPartitions = partitions.stream()
123                                        .map(partition -> new TopicPartition(ns.getTopicName(partition.name()),
124                                                partition.partition()))
125                                        .collect(Collectors.toList());
126        ret.consumer.assign(ret.topicPartitions);
127        log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, ns));
128        return ret;
129    }
130
131    @SuppressWarnings("squid:S2095")
132    public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(Codec<M> codec, KafkaNamespace ns,
133            Collection<String> names, String group, Properties consumerProps, RebalanceListener listener) {
134        KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, ns, group, consumerProps);
135        ret.id = buildSubscribeId(ret.group, names);
136        ret.names = names;
137        Collection<String> topics = names.stream().map(ns::getTopicName).collect(Collectors.toList());
138        ret.listener = listener;
139        ret.consumer.subscribe(topics, ret);
140        ret.partitions = Collections.emptyList();
141        ret.topicPartitions = Collections.emptyList();
142        log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, ns));
143        return ret;
144    }
145
146    protected static String buildId(String group, Collection<LogPartition> partitions) {
147        return group + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|"));
148    }
149
150    protected static String buildSubscribeId(String group, Collection<String> names) {
151        return group + ":" + String.join("|", names);
152    }
153
154    @Override
155    public LogRecord<M> read(Duration timeout) throws InterruptedException {
156        if (closed) {
157            throw new IllegalStateException("The tailer has been closed.");
158        }
159        if (records.isEmpty()) {
160            int items = poll(timeout);
161            if (isRebalanced) {
162                isRebalanced = false;
163                log.debug("Rebalance happens during poll, raising exception");
164                throw new RebalanceException("Partitions has been rebalanced");
165            }
166            if (items == 0) {
167                if (log.isTraceEnabled()) {
168                    log.trace("No data " + id + " after " + timeout.toMillis() + " ms");
169                }
170                return null;
171            }
172        }
173        ConsumerRecord<String, Bytes> record = records.poll();
174        lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
175        M value = decodeCodec.decode(record.value().get());
176        LogPartition partition = LogPartition.of(ns.getLogName(record.topic()), record.partition());
177        LogOffset offset = new LogOffsetImpl(partition, record.offset());
178        consumerMoved = false;
179        if (log.isDebugEnabled()) {
180            log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value));
181        }
182        return new LogRecord<>(value, offset);
183    }
184
185    protected int poll(Duration timeout) throws InterruptedException {
186        records.clear();
187        try {
188            for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout)) {
189                if (log.isDebugEnabled() && records.isEmpty()) {
190                    log.debug("Poll first record: " + ns.getLogName(record.topic()) + ":" + record.partition() + ":+"
191                            + record.offset());
192                }
193                records.add(record);
194            }
195        } catch (org.apache.kafka.common.errors.InterruptException e) {
196            // the thread is already interrupted
197            throw new InterruptedException(e.getMessage());
198        } catch (WakeupException e) {
199            log.debug("Receiving wakeup from another thread to close the tailer");
200            close();
201            throw new IllegalStateException("poll interrupted because tailer has been closed");
202        }
203        if (log.isDebugEnabled()) {
204            String msg = "Polling " + id + " returns " + records.size() + " records";
205            if (records.isEmpty()) {
206                log.trace(msg);
207            } else {
208                log.debug(msg);
209            }
210        }
211        return records.size();
212    }
213
214    @Override
215    public void toEnd() {
216        log.debug("toEnd: " + id);
217        lastOffsets.clear();
218        records.clear();
219        consumer.seekToEnd(Collections.emptyList());
220        consumerMoved = true;
221    }
222
223    @Override
224    public void toStart() {
225        log.debug("toStart: " + id);
226        lastOffsets.clear();
227        records.clear();
228        consumer.seekToBeginning(Collections.emptyList());
229        consumerMoved = true;
230    }
231
232    @Override
233    public void toLastCommitted() {
234        if (log.isDebugEnabled()) {
235            log.debug("toLastCommitted tailer: " + id);
236        }
237        String msg = consumer.assignment()
238                             .stream()
239                             .map(tp -> String.format("%s-%02d:+%d", ns.getLogName(tp.topic()), tp.partition(),
240                                     toLastCommitted(tp)))
241                             .collect(Collectors.joining("|"));
242        if (msg.length() > 0 && log.isInfoEnabled()) {
243            log.info("toLastCommitted offsets: " + group + ":" + msg);
244        }
245        lastOffsets.clear();
246        records.clear();
247        consumerMoved = false;
248    }
249
250    protected long toLastCommitted(TopicPartition topicPartition) {
251        Long offset = lastCommittedOffsets.get(topicPartition);
252        if (offset == null) {
253            OffsetAndMetadata offsetMeta = consumer.committed(topicPartition);
254            if (offsetMeta != null) {
255                offset = offsetMeta.offset();
256            }
257        }
258        if (offset != null) {
259            consumer.seek(topicPartition, offset);
260        } else {
261            consumer.seekToBeginning(Collections.singletonList(topicPartition));
262            offset = consumer.position(topicPartition);
263        }
264        lastCommittedOffsets.put(topicPartition, offset);
265        if (log.isDebugEnabled()) {
266            log.debug(String.format(" toLastCommitted: %s-%02d:+%d", ns.getLogName(topicPartition.topic()),
267                    topicPartition.partition(), offset));
268        }
269        return offset;
270    }
271
272    @Override
273    public void seek(LogOffset offset) {
274        log.debug("Seek to: " + offset.offset() + " from tailer: " + id);
275        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(offset.partition().name()),
276                offset.partition().partition());
277        consumer.seek(topicPartition, offset.offset());
278        lastOffsets.remove(topicPartition);
279        int partition = topicPartition.partition();
280        records.removeIf(rec -> rec.partition() == partition);
281        consumerMoved = true;
282    }
283
284    @Override
285    public void reset() {
286        // we just commit the first offset
287        log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id);
288        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
289        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
290        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
291        consumer.commitSync(offsetToCommit);
292        lastCommittedOffsets.clear();
293        toLastCommitted();
294    }
295
296    @Override
297    public void reset(LogPartition partition) {
298        log.info("Reset committed offset for partition: " + partition + " tailer: " + id);
299        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition());
300        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition));
301        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
302        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
303        consumer.commitSync(offsetToCommit);
304        lastCommittedOffsets.remove(topicPartition);
305        seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition)));
306    }
307
308    @Override
309    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
310        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition());
311        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(
312                Collections.singletonMap(topicPartition, timestamp));
313        if (offsetsForTimes.size() == 1) {
314            OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition);
315            if (offsetAndTimestamp != null) {
316                return new LogOffsetImpl(partition, offsetAndTimestamp.offset());
317            }
318        }
319        return null;
320    }
321
322    @Override
323    public void commit() {
324        if (consumerMoved) {
325            forceCommit();
326            return;
327        }
328        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
329        lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1)));
330        lastOffsets.clear();
331        if (offsetToCommit.isEmpty()) {
332            return;
333        }
334        consumer.commitSync(offsetToCommit);
335        offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
336        if (log.isDebugEnabled()) {
337            String msg = offsetToCommit.entrySet()
338                                       .stream()
339                                       .map(entry -> String.format("%s-%02d:+%d", ns.getLogName(entry.getKey().topic()),
340                                               entry.getKey().partition(), entry.getValue().offset()))
341                                       .collect(Collectors.joining("|"));
342            log.debug("Committed offsets  " + group + ":" + msg);
343        }
344    }
345
346    /**
347     * Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsets
348     */
349    protected void forceCommit() {
350        log.info("Force commit after a move");
351
352        Map<TopicPartition, OffsetAndMetadata> offsets = topicPartitions.stream()
353                                                                        .collect(toMap(tp -> tp,
354                                                                                tp -> new OffsetAndMetadata(
355                                                                                        consumer.position(tp))));
356        consumer.commitSync(offsets);
357        offsets.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
358        consumerMoved = false;
359        lastOffsets.clear();
360    }
361
362    @Override
363    public LogOffset commit(LogPartition partition) {
364        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition());
365        Long offset = lastOffsets.get(topicPartition);
366        if (offset == null) {
367            if (log.isDebugEnabled()) {
368                log.debug("unchanged partition, nothing to commit: " + partition);
369            }
370            return null;
371        }
372        offset += 1;
373        consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset)));
374        LogOffset ret = new LogOffsetImpl(partition, offset);
375        if (log.isInfoEnabled()) {
376            log.info("Committed: " + offset + "/" + group);
377        }
378        return ret;
379    }
380
381    @Override
382    public Collection<LogPartition> assignments() {
383        return partitions;
384    }
385
386    @Override
387    public String group() {
388        return group;
389    }
390
391    @Override
392    public boolean closed() {
393        return closed;
394    }
395
396    @Override
397    public Codec<M> getCodec() {
398        return codec;
399    }
400
401    @SuppressWarnings("squid:S1181")
402    @Override
403    public void close() {
404        if (consumer != null) {
405            log.debug("Closing tailer: " + id);
406            try {
407                // calling wakeup enable to terminate consumer blocking on poll call
408                consumer.close();
409            } catch (ConcurrentModificationException e) {
410                // closing from another thread raise this exception, try to wakeup the owner
411                log.info("Closing tailer from another thread, send wakeup");
412                consumer.wakeup();
413                return;
414            } catch (InterruptException | IllegalStateException e) {
415                // this happens if the consumer has already been closed or if it is closed from another
416                // thread.
417                log.warn("Discard error while closing consumer: ", e);
418            } catch (Throwable t) {
419                log.error("interrupted", t);
420            }
421            consumer = null;
422        }
423        closed = true;
424    }
425
426    @Override
427    public String toString() {
428        return "KafkaLogTailer{" + "ns='" + ns + '\'' + ", id=" + id + ", closed=" + closed + ", codec=" + codec + '}';
429    }
430
431    @Override
432    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
433        Collection<LogPartition> revoked = partitions.stream()
434                                                     .map(tp -> LogPartition.of(ns.getLogName(tp.topic()),
435                                                             tp.partition()))
436                                                     .collect(Collectors.toList());
437        log.info(String.format("Rebalance revoked: %s", revoked));
438        id += "-revoked";
439        if (listener != null) {
440            listener.onPartitionsRevoked(revoked);
441        }
442    }
443
444    @Override
445    public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) {
446        partitions = newPartitions.stream()
447                                  .map(tp -> LogPartition.of(ns.getLogName(tp.topic()), tp.partition()))
448                                  .collect(Collectors.toList());
449        topicPartitions = newPartitions;
450        id = buildId(group, partitions);
451        lastCommittedOffsets.clear();
452        lastOffsets.clear();
453        records.clear();
454        isRebalanced = true;
455        log.info(String.format("Rebalance assigned: %s", partitions));
456        if (listener != null) {
457            listener.onPartitionsAssigned(partitions);
458        }
459    }
460
461}