001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.apache.kafka.clients.consumer.ConsumerConfig;
024import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
025import org.apache.kafka.clients.consumer.ConsumerRecord;
026import org.apache.kafka.clients.consumer.KafkaConsumer;
027import org.apache.kafka.clients.consumer.OffsetAndMetadata;
028import org.apache.kafka.common.TopicPartition;
029import org.apache.kafka.common.errors.InterruptException;
030import org.apache.kafka.common.errors.WakeupException;
031import org.apache.kafka.common.utils.Bytes;
032import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset;
033import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
034import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceException;
035import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener;
036import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord;
037import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
038import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQOffsetImpl;
039
040import java.io.ByteArrayInputStream;
041import java.io.Externalizable;
042import java.io.IOException;
043import java.io.ObjectInput;
044import java.io.ObjectInputStream;
045import java.time.Duration;
046import java.util.Collection;
047import java.util.Collections;
048import java.util.ConcurrentModificationException;
049import java.util.HashMap;
050import java.util.LinkedList;
051import java.util.Map;
052import java.util.Objects;
053import java.util.Properties;
054import java.util.Queue;
055import java.util.stream.Collectors;
056
057
058/**
059 * @since 9.2
060 */
061public class KafkaMQTailer<M extends Externalizable> implements MQTailer<M>, ConsumerRebalanceListener {
062    private static final Log log = LogFactory.getLog(KafkaMQTailer.class);
063    private final String group;
064    private final String prefix;
065    private KafkaConsumer<String, Bytes> consumer;
066    private String id;
067    private Collection<TopicPartition> topicPartitions;
068    private Collection<MQPartition> partitions;
069    private final Map<TopicPartition, Long> lastOffsets = new HashMap<>();
070    private final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>();
071    private final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>();
072    // keep track of all tailers on the same namespace index even from different mq
073    private boolean closed = false;
074    private Collection<String> names;
075    private MQRebalanceListener listener;
076    private boolean isRebalanced = false;
077
078    protected KafkaMQTailer(String prefix, String group, Properties consumerProps) {
079        Objects.requireNonNull(group);
080        this.prefix = prefix;
081        this.group = group;
082        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group);
083        this.consumer = new KafkaConsumer<>(consumerProps);
084
085    }
086
087    public static <M extends Externalizable> KafkaMQTailer<M> createAndAssign(String prefix, Collection<MQPartition> partitions, String group, Properties consumerProps) {
088        KafkaMQTailer<M> ret = new KafkaMQTailer<>(prefix, group, consumerProps);
089        ret.id = buildId(ret.group, partitions);
090        ret.partitions = partitions;
091        ret.topicPartitions = partitions.stream().map(partition -> new TopicPartition(prefix + partition.name(),
092                partition.partition())).collect(Collectors.toList());
093        ret.consumer.assign(ret.topicPartitions);
094        log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, prefix));
095        return ret;
096    }
097
098    public static <M extends Externalizable> KafkaMQTailer<M> createAndSubscribe(String prefix, Collection<String> names, String group, Properties consumerProps,
099                                                                                 MQRebalanceListener listener) {
100        KafkaMQTailer<M> ret = new KafkaMQTailer<>(prefix, group, consumerProps);
101        ret.id = buildSubscribeId(ret.group, names);
102        ret.names = names;
103        Collection<String> topics = names.stream().map(name -> prefix + name).collect(Collectors.toList());
104        ret.listener = listener;
105        ret.consumer.subscribe(topics, ret);
106        ret.partitions = Collections.emptyList();
107        log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, prefix));
108        return ret;
109    }
110
111    private static String buildId(String group, Collection<MQPartition> partitions) {
112        return group + ":" + partitions.stream().map(MQPartition::toString).collect(Collectors.joining("|"));
113    }
114
115    private static String buildSubscribeId(String group, Collection<String> names) {
116        return group + ":" + names.stream().collect(Collectors.joining("|"));
117    }
118
119
120    @Override
121    public MQRecord<M> read(Duration timeout) throws InterruptedException {
122        if (closed) {
123            throw new IllegalStateException("The tailer has been closed.");
124        }
125        if (records.isEmpty()) {
126            int items = poll(timeout);
127            if (isRebalanced) {
128                isRebalanced = false;
129                log.debug("Rebalance happens during poll, raising exception");
130                throw new MQRebalanceException();
131            }
132            if (items == 0) {
133                if (log.isTraceEnabled()) {
134                    log.trace("No data " + id + " after " + timeout.toMillis() + " ms");
135                }
136                return null;
137            }
138        }
139        ConsumerRecord<String, Bytes> record = records.poll();
140        lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
141        M value = messageOf(record.value());
142        MQPartition partition = MQPartition.of(getNameForTopic(record.topic()), record.partition());
143        MQOffset offset = new MQOffsetImpl(partition, record.offset());
144        if (log.isDebugEnabled()) {
145            log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value));
146        }
147        return new MQRecord<>(partition, value, offset);
148    }
149
150    private String getNameForTopic(String topic) {
151        return topic.replaceFirst(prefix, "");
152    }
153
154    @SuppressWarnings("unchecked")
155    private M messageOf(Bytes value) {
156        ByteArrayInputStream bis = new ByteArrayInputStream(value.get());
157        ObjectInput in = null;
158        try {
159            in = new ObjectInputStream(bis);
160            return (M) in.readObject();
161        } catch (IOException | ClassNotFoundException e) {
162            throw new RuntimeException(e);
163        } finally {
164            try {
165                if (in != null) {
166                    in.close();
167                }
168            } catch (IOException ex) {
169                // ignore close exception
170            }
171        }
172    }
173
174    private int poll(Duration timeout) throws InterruptedException {
175        records.clear();
176        try {
177            for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout.toMillis())) {
178                if (log.isDebugEnabled() && records.isEmpty()) {
179                    log.debug("Poll first record: " + getNameForTopic(record.topic()) + ":" + record.partition() + ":+" + record.offset());
180                }
181                records.add(record);
182            }
183        } catch (org.apache.kafka.common.errors.InterruptException e) {
184            Thread.currentThread().interrupt();
185            throw new InterruptedException(e.getMessage());
186        } catch (WakeupException e) {
187            log.debug("Receiving wakeup from another thread to close the tailer");
188            try {
189                close();
190            } catch (Exception e1) {
191                log.warn("Error while closing the tailer " + this);
192            }
193            throw new IllegalStateException("poll interrupted because tailer has been closed");
194        }
195        if (log.isDebugEnabled()) {
196            String msg = "Polling " + id + " returns " + records.size() + " records";
197            if (records.size() > 0) {
198                log.debug(msg);
199            } else {
200                log.trace(msg);
201            }
202        }
203        return records.size();
204    }
205
206    @Override
207    public void toEnd() {
208        log.debug("toEnd: " + id);
209        lastOffsets.clear();
210        records.clear();
211        consumer.seekToEnd(topicPartitions);
212    }
213
214    @Override
215    public void toStart() {
216        log.debug("toStart: " + id);
217        lastOffsets.clear();
218        records.clear();
219        consumer.seekToBeginning(topicPartitions);
220    }
221
222    @Override
223    public void toLastCommitted() {
224        log.debug("toLastCommitted tailer: " + id);
225        String msg = consumer.assignment().stream().map(tp -> String.format("%s-%02d:+%d",
226                getNameForTopic(tp.topic()), tp.partition(), toLastCommitted(tp)))
227                .collect(Collectors.joining("|"));
228        if (msg.length() > 0) {
229            log.info("toLastCommitted offsets: " + group + ":" + msg);
230        }
231        lastCommittedOffsets.clear();
232        lastOffsets.clear();
233        records.clear();
234    }
235
236    private long toLastCommitted(TopicPartition topicPartition) {
237        Long offset = lastCommittedOffsets.get(topicPartition);
238        if (offset == null) {
239            OffsetAndMetadata offsetMeta = consumer.committed(topicPartition);
240            if (offsetMeta != null) {
241                offset = offsetMeta.offset();
242            }
243        }
244        if (offset != null) {
245            consumer.seek(topicPartition, offset);
246        } else {
247            consumer.seekToBeginning(Collections.singletonList(topicPartition));
248            offset = consumer.position(topicPartition);
249        }
250        log.debug(String.format(" toLastCommitted: %s-%02d:+%d", getNameForTopic(topicPartition.topic()),
251                topicPartition.partition(),
252                offset));
253        return offset;
254    }
255
256    public void seek(MQPartition partition, MQOffset offset) {
257        log.debug("seek tailer: " + id + " +" + offset);
258        TopicPartition topicPartition = new TopicPartition(prefix + partition.name(), partition.partition());
259        consumer.seek(topicPartition, offset.offset());
260        //lastOffsets.remove(topicPartition);
261        // records.stream().filter(rec -> partition.partition() != rec.partition() || partition.equals(rec.))
262        records.clear();
263    }
264
265    @Override
266    public void commit() {
267        Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>();
268        lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1)));
269        lastOffsets.clear();
270        if (offsetToCommit.isEmpty()) {
271            return;
272        }
273        consumer.commitSync(offsetToCommit);
274        offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset()));
275        if (log.isDebugEnabled()) {
276            String msg = offsetToCommit.entrySet().stream().map(entry -> String.format("%s-%02d:+%d",
277                    getNameForTopic(entry.getKey().topic()), entry.getKey().partition(), entry.getValue().offset()))
278                    .collect(Collectors.joining("|"));
279            log.debug("Committed offsets  " + group + ":" + msg);
280        }
281    }
282
283    @Override
284    public MQOffset commit(MQPartition partition) {
285        TopicPartition topicPartition = new TopicPartition(prefix + partition.name(), partition.partition());
286        Long offset = lastOffsets.get(topicPartition);
287        if (offset == null) {
288            log.debug("unchanged partition, nothing to commit: " + partition);
289            return null;
290        }
291        offset += 1;
292        consumer.commitSync(Collections.singletonMap(topicPartition,
293                new OffsetAndMetadata(offset)));
294        MQOffset ret = new MQOffsetImpl(partition, offset);
295        if (log.isDebugEnabled()) {
296            log.info("Committed: " + offset + "/" + group);
297        }
298        return ret;
299    }
300
301    @Override
302    public Collection<MQPartition> assignments() {
303        return partitions;
304    }
305
306    @Override
307    public String group() {
308        return group;
309    }
310
311    @Override
312    public boolean closed() {
313        return closed;
314    }
315
316    @Override
317    public void close() throws Exception {
318        if (consumer != null) {
319            log.info("Closing tailer: " + id);
320            try {
321                // calling wakeup enable to terminate consumer blocking on poll call
322                consumer.close();
323            } catch (ConcurrentModificationException e) {
324                // closing from another thread raise this exception, try to wakeup the owner
325                log.info("Closing tailer from another thread, send wakeup");
326                consumer.wakeup();
327                return;
328            } catch (InterruptException | IllegalStateException e) {
329                // this happens if the consumer has already been closed or if it is closed from another
330                // thread.
331                log.warn("Discard error while closing consumer: ", e);
332            } catch (Throwable t) {
333                log.error("interrupted", t);
334            }
335            consumer = null;
336        }
337        closed = true;
338    }
339
340    @Override
341    public String toString() {
342        return "KafkaMQTailer{" +
343                "prefix='" + prefix + '\'' +
344                ", id=" + id +
345                ", closed=" + closed +
346                '}';
347    }
348
349    @Override
350    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
351        Collection<MQPartition> revoked = partitions.stream()
352                .map(tp -> MQPartition.of(getNameForTopic(tp.topic()), tp.partition()))
353                .collect(Collectors.toList());
354        log.info(String.format("Rebalance revoked: %s", revoked));
355        id += "-revoked";
356        if (listener != null) {
357            listener.onPartitionsRevoked(revoked);
358        }
359    }
360
361    @Override
362    public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) {
363        partitions = newPartitions.stream().map(tp -> MQPartition.of(getNameForTopic(tp.topic()), tp.partition()))
364                .collect(Collectors.toList());
365        id = buildId(group, partitions);
366        lastCommittedOffsets.clear();
367        lastOffsets.clear();
368        records.clear();
369        isRebalanced = true;
370        log.info(String.format("Rebalance assigned: %s", partitions));
371        if (listener != null) {
372            listener.onPartitionsAssigned(partitions);
373        }
374    }
375
376
377}