001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import static java.util.stream.Collectors.toMap;
022
023import java.io.ByteArrayInputStream;
024import java.io.Externalizable;
025import java.io.IOException;
026import java.io.ObjectInput;
027import java.io.ObjectInputStream;
028import java.time.Duration;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.ConcurrentModificationException;
032import java.util.HashMap;
033import java.util.LinkedList;
034import java.util.Map;
035import java.util.Objects;
036import java.util.Properties;
037import java.util.Queue;
038import java.util.stream.Collectors;
039
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.apache.kafka.clients.consumer.ConsumerConfig;
043import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
044import org.apache.kafka.clients.consumer.ConsumerRecord;
045import org.apache.kafka.clients.consumer.KafkaConsumer;
046import org.apache.kafka.clients.consumer.OffsetAndMetadata;
047import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
048import org.apache.kafka.common.TopicPartition;
049import org.apache.kafka.common.errors.InterruptException;
050import org.apache.kafka.common.errors.WakeupException;
051import org.apache.kafka.common.utils.Bytes;
052import org.nuxeo.lib.stream.log.LogOffset;
053import org.nuxeo.lib.stream.log.LogPartition;
054import org.nuxeo.lib.stream.log.LogRecord;
055import org.nuxeo.lib.stream.log.LogTailer;
056import org.nuxeo.lib.stream.log.RebalanceException;
057import org.nuxeo.lib.stream.log.RebalanceListener;
058import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
059
060/**
061 * @since 9.3
062 */
063public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener {
064    private static final Log log = LogFactory.getLog(KafkaLogTailer.class);
065
066    protected final String group;
067
068    protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>();
069
070    protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>();
071
072    protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>();
073
074    protected final KafkaNamespace ns;
075
076    protected KafkaConsumer<String, Bytes> consumer;
077
078    protected String id;
079
080    protected Collection<TopicPartition> topicPartitions;
081
082    protected Collection<LogPartition> partitions;
083
084    // keep track of all tailers on the same namespace index even from different mq
085    protected boolean closed;
086
087    protected Collection<String> names;
088
089    protected RebalanceListener listener;
090
091    protected boolean isRebalanced;
092
093    protected boolean consumerMoved;
094
095    protected KafkaLogTailer(KafkaNamespace ns, String group, Properties consumerProps) {
096        Objects.requireNonNull(group);
097        this.ns = ns;
098        this.group = group;
099        // this.prefixedGroup = group;
100        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group));
101        this.consumer = new KafkaConsumer<>(consumerProps);
102
103    }
104
105    public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(KafkaNamespace ns,
106            Collection<LogPartition> partitions, String group, Properties consumerProps) {
107        KafkaLogTailer<M> ret = new KafkaLogTailer<>(ns, group, consumerProps);
108        ret.id = buildId(ret.group, partitions);
109        ret.partitions = partitions;
110        ret.topicPartitions = partitions.stream()
111                                        .map(partition -> new TopicPartition(ns.getTopicName(partition.name()),
112                                                partition.partition()))
113                                        .collect(Collectors.toList());
114        ret.consumer.assign(ret.topicPartitions);
115        log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, ns));
116        return ret;
117    }
118
119    public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(KafkaNamespace ns,
120            Collection<String> names, String group, Properties consumerProps, RebalanceListener listener) {
121        KafkaLogTailer<M> ret = new KafkaLogTailer<>(ns, group, consumerProps);
122        ret.id = buildSubscribeId(ret.group, names);
123        ret.names = names;
124        Collection<String> topics = names.stream().map(ns::getTopicName).collect(Collectors.toList());
125        ret.listener = listener;
126        ret.consumer.subscribe(topics, ret);
127        ret.partitions = Collections.emptyList();
128        ret.topicPartitions = Collections.emptyList();
129        log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, ns));
130        return ret;
131    }
132
133    protected static String buildId(String group, Collection<LogPartition> partitions) {
134        return group + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|"));
135    }
136
137    protected static String buildSubscribeId(String group, Collection<String> names) {
138        return group + ":" + names.stream().collect(Collectors.joining("|"));
139    }
140
141    @Override
142    public LogRecord<M> read(Duration timeout) throws InterruptedException {
143        if (closed) {
144            throw new IllegalStateException("The tailer has been closed.");
145        }
146        if (records.isEmpty()) {
147            int items = poll(timeout);
148            if (isRebalanced) {
149                isRebalanced = false;
150                log.debug("Rebalance happens during poll, raising exception");
151                throw new RebalanceException("Partitions has been rebalanced");
152            }
153            if (items == 0) {
154                if (log.isTraceEnabled()) {
155                    log.trace("No data " + id + " after " + timeout.toMillis() + " ms");
156                }
157                return null;
158            }
159        }
160        ConsumerRecord<String, Bytes> record = records.poll();
161        lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
162        M value = messageOf(record.value());
163        LogPartition partition = LogPartition.of(ns.getLogName(record.topic()), record.partition());
164        LogOffset offset = new LogOffsetImpl(partition, record.offset());
165        consumerMoved = false;
166        if (log.isDebugEnabled()) {
167            log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value));
168        }
169        return new LogRecord<>(value, offset);
170    }
171
172    @SuppressWarnings("unchecked")
173    protected M messageOf(Bytes value) {
174        // TODO: switch to commons-lang3 SerializationUtils
175        ByteArrayInputStream bis = new ByteArrayInputStream(value.get());
176        ObjectInput in = null;
177        try {
178            in = new ObjectInputStream(bis);
179            return (M) in.readObject();
180        } catch (IOException | ClassNotFoundException e) {
181            throw new RuntimeException(e);
182        } finally {
183            try {
184                if (in != null) {
185                    in.close();
186                }
187            } catch (IOException ex) {
188                // ignore close exception
189            }
190        }
191    }
192
193    protected int poll(Duration timeout) throws InterruptedException {
194        records.clear();
195        try {
196            for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout.toMillis())) {
197                if (log.isDebugEnabled() && records.isEmpty()) {
198                    log.debug("Poll first record: " + ns.getLogName(record.topic()) + ":" + record.partition() + ":+"
199                            + record.offset());
200                }
201                records.add(record);
202            }
203        } catch (org.apache.kafka.common.errors.InterruptException e) {
204            // the thread is already interrupted
205            throw new InterruptedException(e.getMessage());
206        } catch (WakeupException e) {
207            log.debug("Receiving wakeup from another thread to close the tailer");
208            close();
209            throw new IllegalStateException("poll interrupted because tailer has been closed");
210        }
211        if (log.isDebugEnabled()) {
212            String msg = "Polling " + id + " returns " + records.size() + " records";
213            if (records.size() > 0) {
214                log.debug(msg);
215            } else {
216                log.trace(msg);
217            }
218        }
219        return records.size();
220    }
221
222    @Override
223    public void toEnd() {
224        log.debug("toEnd: " + id);
225        lastOffsets.clear();
226        records.clear();
227        consumer.seekToEnd(Collections.emptyList());
228        consumerMoved = true;
229    }
230
231    @Override
232    public void toStart() {
233        log.debug("toStart: " + id);
234        lastOffsets.clear();
235        records.clear();
236        consumer.seekToBeginning(Collections.emptyList());
237        consumerMoved = true;
238    }
239
240    @Override
241    public void toLastCommitted() {
242        if (log.isDebugEnabled()) {
243            log.debug("toLastCommitted tailer: " + id);
244        }
245        String msg = consumer.assignment()
246                             .stream()
247                             .map(tp -> String.format("%s-%02d:+%d", ns.getLogName(tp.topic()), tp.partition(),
248                                     toLastCommitted(tp)))
249                             .collect(Collectors.joining("|"));
250        if (msg.length() > 0) {
251            if (log.isInfoEnabled()) {
252                log.info("toLastCommitted offsets: " + group + ":" + msg);
253            }
254        }
255        lastOffsets.clear();
256        records.clear();
257        consumerMoved = false;
258    }
259
260    protected long toLastCommitted(TopicPartition topicPartition) {
261        Long offset = lastCommittedOffsets.get(topicPartition);
262        if (offset == null) {
263            OffsetAndMetadata offsetMeta = consumer.committed(topicPartition);
264            if (offsetMeta != null) {
265                offset = offsetMeta.offset();
266            }
267        }
268        if (offset != null) {
269            consumer.seek(topicPartition, offset);
270        } else {
271            consumer.seekToBeginning(Collections.singletonList(topicPartition));
272            offset = consumer.position(topicPartition);
273        }
274        lastCommittedOffsets.put(topicPartition,offset);
275        if (log.isDebugEnabled()) {
276            log.debug(String.format(" toLastCommitted: %s-%02d:+%d", ns.getLogName(topicPartition.topic()),
277                    topicPartition.partition(), offset));
278        }
279        return offset;
280    }
281
282    @Override
283    public void seek(LogOffset offset) {
284        log.debug("Seek to: " + offset.offset() + " from tailer: " + id);
285        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(offset.partition().name()),
286                offset.partition().partition());
287        consumer.seek(topicPartition, offset.offset());
288        lastOffsets.remove(topicPartition);
289        int partition = topicPartition.partition();
290        records.removeIf(rec -> rec.partition() == partition);
291        consumerMoved = true;
292    }
293
294    @Override
295    public void reset() {
296        // we just commit the first offset
297        log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id);
298        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
299        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
300        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
301        consumer.commitSync(offsetToCommit);
302        lastCommittedOffsets.clear();
303        toLastCommitted();
304    }
305
306    @Override
307    public void reset(LogPartition partition) {
308        log.info("Reset committed offset for partition: " + partition + " tailer: " + id);
309        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition());
310        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition));
311        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
312        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
313        consumer.commitSync(offsetToCommit);
314        lastCommittedOffsets.remove(topicPartition);
315        seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition)));
316    }
317
318    @Override
319    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
320        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition());
321        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(
322                Collections.singletonMap(topicPartition, timestamp));
323        if (offsetsForTimes.size() == 1) {
324            OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition);
325            if (offsetAndTimestamp != null) {
326                return new LogOffsetImpl(partition, offsetAndTimestamp.offset());
327            }
328        }
329        return null;
330    }
331
332    @Override
333    public void commit() {
334        if (consumerMoved) {
335            forceCommit();
336            return;
337        }
338        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
339        lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1)));
340        lastOffsets.clear();
341        if (offsetToCommit.isEmpty()) {
342            return;
343        }
344        consumer.commitSync(offsetToCommit);
345        offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
346        if (log.isDebugEnabled()) {
347            String msg = offsetToCommit.entrySet()
348                                       .stream()
349                                       .map(entry -> String.format("%s-%02d:+%d",
350                                               ns.getLogName(entry.getKey().topic()), entry.getKey().partition(),
351                                               entry.getValue().offset()))
352                                       .collect(Collectors.joining("|"));
353            log.debug("Committed offsets  " + group + ":" + msg);
354        }
355    }
356
357    /**
358     * Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsets
359     */
360    protected void forceCommit() {
361        log.info("Force commit after a move");
362
363        Map<TopicPartition, OffsetAndMetadata> offsets =
364                topicPartitions.stream().collect(toMap(tp -> tp, tp -> new OffsetAndMetadata(consumer.position(tp))));
365        consumer.commitSync(offsets);
366        offsets.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
367        consumerMoved = false;
368        lastOffsets.clear();
369    }
370
371    @Override
372    public LogOffset commit(LogPartition partition) {
373        TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition());
374        Long offset = lastOffsets.get(topicPartition);
375        if (offset == null) {
376            if (log.isDebugEnabled()) {
377                log.debug("unchanged partition, nothing to commit: " + partition);
378            }
379            return null;
380        }
381        offset += 1;
382        consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset)));
383        LogOffset ret = new LogOffsetImpl(partition, offset);
384        if (log.isInfoEnabled()) {
385            log.info("Committed: " + offset + "/" + group);
386        }
387        return ret;
388    }
389
390    @Override
391    public Collection<LogPartition> assignments() {
392        return partitions;
393    }
394
395    @Override
396    public String group() {
397        return group;
398    }
399
400    @Override
401    public boolean closed() {
402        return closed;
403    }
404
405    @Override
406    public void close() {
407        if (consumer != null) {
408            log.info("Closing tailer: " + id);
409            try {
410                // calling wakeup enable to terminate consumer blocking on poll call
411                consumer.close();
412            } catch (ConcurrentModificationException e) {
413                // closing from another thread raise this exception, try to wakeup the owner
414                log.info("Closing tailer from another thread, send wakeup");
415                consumer.wakeup();
416                return;
417            } catch (InterruptException | IllegalStateException e) {
418                // this happens if the consumer has already been closed or if it is closed from another
419                // thread.
420                log.warn("Discard error while closing consumer: ", e);
421            } catch (Throwable t) {
422                log.error("interrupted", t);
423            }
424            consumer = null;
425        }
426        closed = true;
427    }
428
429    @Override
430    public String toString() {
431        return "KafkaLogTailer{" + "ns='" + ns + '\'' + ", id=" + id + ", closed=" + closed + '}';
432    }
433
434    @Override
435    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
436        Collection<LogPartition> revoked = partitions.stream()
437                                                     .map(tp -> LogPartition.of(ns.getLogName(tp.topic()),
438                                                             tp.partition()))
439                                                     .collect(Collectors.toList());
440        log.info(String.format("Rebalance revoked: %s", revoked));
441        id += "-revoked";
442        if (listener != null) {
443            listener.onPartitionsRevoked(revoked);
444        }
445    }
446
447    @Override
448    public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) {
449        partitions = newPartitions.stream()
450                                  .map(tp -> LogPartition.of(ns.getLogName(tp.topic()), tp.partition()))
451                                  .collect(Collectors.toList());
452        topicPartitions = newPartitions;
453        id = buildId(group, partitions);
454        lastCommittedOffsets.clear();
455        lastOffsets.clear();
456        records.clear();
457        isRebalanced = true;
458        log.info(String.format("Rebalance assigned: %s", partitions));
459        if (listener != null) {
460            listener.onPartitionsAssigned(partitions);
461        }
462    }
463
464}