001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.kafka;
020
021import java.io.ByteArrayInputStream;
022import java.io.Externalizable;
023import java.io.IOException;
024import java.io.ObjectInput;
025import java.io.ObjectInputStream;
026import java.time.Duration;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.ConcurrentModificationException;
030import java.util.HashMap;
031import java.util.LinkedList;
032import java.util.Map;
033import java.util.Objects;
034import java.util.Properties;
035import java.util.Queue;
036import java.util.stream.Collectors;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.apache.kafka.clients.consumer.ConsumerConfig;
041import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
042import org.apache.kafka.clients.consumer.ConsumerRecord;
043import org.apache.kafka.clients.consumer.KafkaConsumer;
044import org.apache.kafka.clients.consumer.OffsetAndMetadata;
045import org.apache.kafka.common.TopicPartition;
046import org.apache.kafka.common.errors.InterruptException;
047import org.apache.kafka.common.errors.WakeupException;
048import org.apache.kafka.common.utils.Bytes;
049import org.nuxeo.lib.stream.log.LogOffset;
050import org.nuxeo.lib.stream.log.LogPartition;
051import org.nuxeo.lib.stream.log.LogRecord;
052import org.nuxeo.lib.stream.log.LogTailer;
053import org.nuxeo.lib.stream.log.RebalanceException;
054import org.nuxeo.lib.stream.log.RebalanceListener;
055import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
056
057/**
058 * @since 9.3
059 */
060public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener {
061    private static final Log log = LogFactory.getLog(KafkaLogTailer.class);
062
063    protected final String group;
064
065    protected final String prefix;
066
067    protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>();
068
069    protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>();
070
071    protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>();
072
073    protected KafkaConsumer<String, Bytes> consumer;
074
075    protected String id;
076
077    protected Collection<TopicPartition> topicPartitions;
078
079    protected Collection<LogPartition> partitions;
080
081    // keep track of all tailers on the same namespace index even from different mq
082    protected boolean closed;
083
084    protected Collection<String> names;
085
086    protected RebalanceListener listener;
087
088    protected boolean isRebalanced;
089
090    protected KafkaLogTailer(String prefix, String group, Properties consumerProps) {
091        Objects.requireNonNull(group);
092        this.prefix = prefix;
093        this.group = group;
094        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group);
095        this.consumer = new KafkaConsumer<>(consumerProps);
096
097    }
098
099    public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(String prefix,
100            Collection<LogPartition> partitions, String group, Properties consumerProps) {
101        KafkaLogTailer<M> ret = new KafkaLogTailer<>(prefix, group, consumerProps);
102        ret.id = buildId(ret.group, partitions);
103        ret.partitions = partitions;
104        ret.topicPartitions = partitions.stream()
105                                        .map(partition -> new TopicPartition(prefix + partition.name(),
106                                                partition.partition()))
107                                        .collect(Collectors.toList());
108        ret.consumer.assign(ret.topicPartitions);
109        log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, prefix));
110        return ret;
111    }
112
113    public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(String prefix,
114            Collection<String> names, String group, Properties consumerProps, RebalanceListener listener) {
115        KafkaLogTailer<M> ret = new KafkaLogTailer<>(prefix, group, consumerProps);
116        ret.id = buildSubscribeId(ret.group, names);
117        ret.names = names;
118        Collection<String> topics = names.stream().map(name -> prefix + name).collect(Collectors.toList());
119        ret.listener = listener;
120        ret.consumer.subscribe(topics, ret);
121        ret.partitions = Collections.emptyList();
122        ret.topicPartitions = Collections.emptyList();
123        log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, prefix));
124        return ret;
125    }
126
127    protected static String buildId(String group, Collection<LogPartition> partitions) {
128        return group + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|"));
129    }
130
131    protected static String buildSubscribeId(String group, Collection<String> names) {
132        return group + ":" + names.stream().collect(Collectors.joining("|"));
133    }
134
135    @Override
136    public LogRecord<M> read(Duration timeout) throws InterruptedException {
137        if (closed) {
138            throw new IllegalStateException("The tailer has been closed.");
139        }
140        if (records.isEmpty()) {
141            int items = poll(timeout);
142            if (isRebalanced) {
143                isRebalanced = false;
144                log.debug("Rebalance happens during poll, raising exception");
145                throw new RebalanceException("Partitions has been rebalanced");
146            }
147            if (items == 0) {
148                if (log.isTraceEnabled()) {
149                    log.trace("No data " + id + " after " + timeout.toMillis() + " ms");
150                }
151                return null;
152            }
153        }
154        ConsumerRecord<String, Bytes> record = records.poll();
155        lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
156        M value = messageOf(record.value());
157        LogPartition partition = LogPartition.of(getNameForTopic(record.topic()), record.partition());
158        LogOffset offset = new LogOffsetImpl(partition, record.offset());
159        if (log.isDebugEnabled()) {
160            log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value));
161        }
162        return new LogRecord<>(value, offset);
163    }
164
165    protected String getNameForTopic(String topic) {
166        return topic.replaceFirst(prefix, "");
167    }
168
169    @SuppressWarnings("unchecked")
170    protected M messageOf(Bytes value) {
171        // TODO: switch to commons-lang3 SerializationUtils
172        ByteArrayInputStream bis = new ByteArrayInputStream(value.get());
173        ObjectInput in = null;
174        try {
175            in = new ObjectInputStream(bis);
176            return (M) in.readObject();
177        } catch (IOException | ClassNotFoundException e) {
178            throw new RuntimeException(e);
179        } finally {
180            try {
181                if (in != null) {
182                    in.close();
183                }
184            } catch (IOException ex) {
185                // ignore close exception
186            }
187        }
188    }
189
190    protected int poll(Duration timeout) throws InterruptedException {
191        records.clear();
192        try {
193            for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout.toMillis())) {
194                if (log.isDebugEnabled() && records.isEmpty()) {
195                    log.debug("Poll first record: " + getNameForTopic(record.topic()) + ":" + record.partition() + ":+"
196                            + record.offset());
197                }
198                records.add(record);
199            }
200        } catch (org.apache.kafka.common.errors.InterruptException e) {
201            // the thread is already interrupted
202            throw new InterruptedException(e.getMessage());
203        } catch (WakeupException e) {
204            log.debug("Receiving wakeup from another thread to close the tailer");
205            close();
206            throw new IllegalStateException("poll interrupted because tailer has been closed");
207        }
208        if (log.isDebugEnabled()) {
209            String msg = "Polling " + id + " returns " + records.size() + " records";
210            if (records.size() > 0) {
211                log.debug(msg);
212            } else {
213                log.trace(msg);
214            }
215        }
216        return records.size();
217    }
218
219    @Override
220    public void toEnd() {
221        log.debug("toEnd: " + id);
222        lastOffsets.clear();
223        records.clear();
224        consumer.seekToEnd(Collections.emptyList());
225    }
226
227    @Override
228    public void toStart() {
229        log.debug("toStart: " + id);
230        lastOffsets.clear();
231        records.clear();
232        consumer.seekToBeginning(Collections.emptyList());
233    }
234
235    @Override
236    public void toLastCommitted() {
237        if (log.isDebugEnabled()) {
238            log.debug("toLastCommitted tailer: " + id);
239        }
240        String msg = consumer.assignment()
241                             .stream()
242                             .map(tp -> String.format("%s-%02d:+%d", getNameForTopic(tp.topic()), tp.partition(),
243                                     toLastCommitted(tp)))
244                             .collect(Collectors.joining("|"));
245        if (msg.length() > 0) {
246            if (log.isInfoEnabled()) {
247                log.info("toLastCommitted offsets: " + group + ":" + msg);
248            }
249        }
250        lastCommittedOffsets.clear();
251        lastOffsets.clear();
252        records.clear();
253    }
254
255    protected long toLastCommitted(TopicPartition topicPartition) {
256        Long offset = lastCommittedOffsets.get(topicPartition);
257        if (offset == null) {
258            OffsetAndMetadata offsetMeta = consumer.committed(topicPartition);
259            if (offsetMeta != null) {
260                offset = offsetMeta.offset();
261            }
262        }
263        if (offset != null) {
264            consumer.seek(topicPartition, offset);
265        } else {
266            consumer.seekToBeginning(Collections.singletonList(topicPartition));
267            offset = consumer.position(topicPartition);
268        }
269        if (log.isDebugEnabled()) {
270            log.debug(String.format(" toLastCommitted: %s-%02d:+%d", getNameForTopic(topicPartition.topic()),
271                    topicPartition.partition(), offset));
272        }
273        return offset;
274    }
275
276    @Override
277    public void seek(LogOffset offset) {
278        log.debug("Seek to: " + offset + " from tailer: " + id);
279        TopicPartition topicPartition = new TopicPartition(prefix + offset.partition().name(),
280                offset.partition().partition());
281        consumer.seek(topicPartition, offset.offset());
282        lastOffsets.remove(topicPartition);
283        int partition = topicPartition.partition();
284        records.removeIf(rec -> rec.partition() == partition);
285    }
286
287    @Override
288    public void reset() {
289        // we just commit the first offset
290        log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id);
291        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
292        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
293        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
294        consumer.commitSync(offsetToCommit);
295        lastCommittedOffsets.clear();
296        toLastCommitted();
297    }
298
299    @Override
300    public void reset(LogPartition partition) {
301        log.info("Reset committed offset for partition: " + partition + " tailer: " + id);
302        TopicPartition topicPartition = new TopicPartition(prefix + partition.name(), partition.partition());
303        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition));
304        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
305        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset)));
306        consumer.commitSync(offsetToCommit);
307        lastCommittedOffsets.remove(topicPartition);
308        seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition)));
309    }
310
311    @Override
312    public void commit() {
313        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
314        lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1)));
315        lastOffsets.clear();
316        if (offsetToCommit.isEmpty()) {
317            return;
318        }
319        consumer.commitSync(offsetToCommit);
320        offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
321        if (log.isDebugEnabled()) {
322            String msg = offsetToCommit.entrySet()
323                                       .stream()
324                                       .map(entry -> String.format("%s-%02d:+%d",
325                                               getNameForTopic(entry.getKey().topic()), entry.getKey().partition(),
326                                               entry.getValue().offset()))
327                                       .collect(Collectors.joining("|"));
328            log.debug("Committed offsets  " + group + ":" + msg);
329        }
330    }
331
332    @Override
333    public LogOffset commit(LogPartition partition) {
334        TopicPartition topicPartition = new TopicPartition(prefix + partition.name(), partition.partition());
335        Long offset = lastOffsets.get(topicPartition);
336        if (offset == null) {
337            if (log.isDebugEnabled()) {
338                log.debug("unchanged partition, nothing to commit: " + partition);
339            }
340            return null;
341        }
342        offset += 1;
343        consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset)));
344        LogOffset ret = new LogOffsetImpl(partition, offset);
345        if (log.isInfoEnabled()) {
346            log.info("Committed: " + offset + "/" + group);
347        }
348        return ret;
349    }
350
351    @Override
352    public Collection<LogPartition> assignments() {
353        return partitions;
354    }
355
356    @Override
357    public String group() {
358        return group;
359    }
360
361    @Override
362    public boolean closed() {
363        return closed;
364    }
365
366    @Override
367    public void close() {
368        if (consumer != null) {
369            log.info("Closing tailer: " + id);
370            try {
371                // calling wakeup enable to terminate consumer blocking on poll call
372                consumer.close();
373            } catch (ConcurrentModificationException e) {
374                // closing from another thread raise this exception, try to wakeup the owner
375                log.info("Closing tailer from another thread, send wakeup");
376                consumer.wakeup();
377                return;
378            } catch (InterruptException | IllegalStateException e) {
379                // this happens if the consumer has already been closed or if it is closed from another
380                // thread.
381                log.warn("Discard error while closing consumer: ", e);
382            } catch (Throwable t) {
383                log.error("interrupted", t);
384            }
385            consumer = null;
386        }
387        closed = true;
388    }
389
390    @Override
391    public String toString() {
392        return "KafkaLogTailer{" + "prefix='" + prefix + '\'' + ", id=" + id + ", closed=" + closed + '}';
393    }
394
395    @Override
396    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
397        Collection<LogPartition> revoked = partitions.stream()
398                                                     .map(tp -> LogPartition.of(getNameForTopic(tp.topic()),
399                                                             tp.partition()))
400                                                     .collect(Collectors.toList());
401        log.info(String.format("Rebalance revoked: %s", revoked));
402        id += "-revoked";
403        if (listener != null) {
404            listener.onPartitionsRevoked(revoked);
405        }
406    }
407
408    @Override
409    public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) {
410        partitions = newPartitions.stream()
411                                  .map(tp -> LogPartition.of(getNameForTopic(tp.topic()), tp.partition()))
412                                  .collect(Collectors.toList());
413        topicPartitions = newPartitions;
414        id = buildId(group, partitions);
415        lastCommittedOffsets.clear();
416        lastOffsets.clear();
417        records.clear();
418        isRebalanced = true;
419        log.info(String.format("Rebalance assigned: %s", partitions));
420        if (listener != null) {
421            listener.onPartitionsAssigned(partitions);
422        }
423    }
424
425}