001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import java.io.ByteArrayInputStream; 022import java.io.Externalizable; 023import java.io.IOException; 024import java.io.ObjectInput; 025import java.io.ObjectInputStream; 026import java.time.Duration; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.ConcurrentModificationException; 030import java.util.HashMap; 031import java.util.LinkedList; 032import java.util.Map; 033import java.util.Objects; 034import java.util.Properties; 035import java.util.Queue; 036import java.util.stream.Collectors; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.apache.kafka.clients.consumer.ConsumerConfig; 041import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 042import org.apache.kafka.clients.consumer.ConsumerRecord; 043import org.apache.kafka.clients.consumer.KafkaConsumer; 044import org.apache.kafka.clients.consumer.OffsetAndMetadata; 045import org.apache.kafka.common.TopicPartition; 046import org.apache.kafka.common.errors.InterruptException; 047import org.apache.kafka.common.errors.WakeupException; 048import org.apache.kafka.common.utils.Bytes; 049import org.nuxeo.lib.stream.log.LogOffset; 050import org.nuxeo.lib.stream.log.LogPartition; 051import org.nuxeo.lib.stream.log.LogRecord; 052import org.nuxeo.lib.stream.log.LogTailer; 053import org.nuxeo.lib.stream.log.RebalanceException; 054import org.nuxeo.lib.stream.log.RebalanceListener; 055import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 056 057/** 058 * @since 9.3 059 */ 060public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener { 061 private static final Log log = LogFactory.getLog(KafkaLogTailer.class); 062 063 protected final String group; 064 065 protected final String prefix; 066 067 protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>(); 068 069 protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>(); 070 071 protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>(); 072 073 protected KafkaConsumer<String, Bytes> consumer; 074 075 protected String id; 076 077 protected Collection<TopicPartition> topicPartitions; 078 079 protected Collection<LogPartition> partitions; 080 081 // keep track of all tailers on the same namespace index even from different mq 082 protected boolean closed; 083 084 protected Collection<String> names; 085 086 protected RebalanceListener listener; 087 088 protected boolean isRebalanced; 089 090 protected KafkaLogTailer(String prefix, String group, Properties consumerProps) { 091 Objects.requireNonNull(group); 092 this.prefix = prefix; 093 this.group = group; 094 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group); 095 this.consumer = new KafkaConsumer<>(consumerProps); 096 097 } 098 099 public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(String prefix, 100 Collection<LogPartition> partitions, String group, Properties consumerProps) { 101 KafkaLogTailer<M> ret = new KafkaLogTailer<>(prefix, group, consumerProps); 102 ret.id = buildId(ret.group, partitions); 103 ret.partitions = partitions; 104 ret.topicPartitions = partitions.stream() 105 .map(partition -> new TopicPartition(prefix + partition.name(), 106 partition.partition())) 107 .collect(Collectors.toList()); 108 ret.consumer.assign(ret.topicPartitions); 109 log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, prefix)); 110 return ret; 111 } 112 113 public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(String prefix, 114 Collection<String> names, String group, Properties consumerProps, RebalanceListener listener) { 115 KafkaLogTailer<M> ret = new KafkaLogTailer<>(prefix, group, consumerProps); 116 ret.id = buildSubscribeId(ret.group, names); 117 ret.names = names; 118 Collection<String> topics = names.stream().map(name -> prefix + name).collect(Collectors.toList()); 119 ret.listener = listener; 120 ret.consumer.subscribe(topics, ret); 121 ret.partitions = Collections.emptyList(); 122 ret.topicPartitions = Collections.emptyList(); 123 log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, prefix)); 124 return ret; 125 } 126 127 protected static String buildId(String group, Collection<LogPartition> partitions) { 128 return group + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|")); 129 } 130 131 protected static String buildSubscribeId(String group, Collection<String> names) { 132 return group + ":" + names.stream().collect(Collectors.joining("|")); 133 } 134 135 @Override 136 public LogRecord<M> read(Duration timeout) throws InterruptedException { 137 if (closed) { 138 throw new IllegalStateException("The tailer has been closed."); 139 } 140 if (records.isEmpty()) { 141 int items = poll(timeout); 142 if (isRebalanced) { 143 isRebalanced = false; 144 log.debug("Rebalance happens during poll, raising exception"); 145 throw new RebalanceException("Partitions has been rebalanced"); 146 } 147 if (items == 0) { 148 if (log.isTraceEnabled()) { 149 log.trace("No data " + id + " after " + timeout.toMillis() + " ms"); 150 } 151 return null; 152 } 153 } 154 ConsumerRecord<String, Bytes> record = records.poll(); 155 lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); 156 M value = messageOf(record.value()); 157 LogPartition partition = LogPartition.of(getNameForTopic(record.topic()), record.partition()); 158 LogOffset offset = new LogOffsetImpl(partition, record.offset()); 159 if (log.isDebugEnabled()) { 160 log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value)); 161 } 162 return new LogRecord<>(value, offset); 163 } 164 165 protected String getNameForTopic(String topic) { 166 return topic.replaceFirst(prefix, ""); 167 } 168 169 @SuppressWarnings("unchecked") 170 protected M messageOf(Bytes value) { 171 // TODO: switch to commons-lang3 SerializationUtils 172 ByteArrayInputStream bis = new ByteArrayInputStream(value.get()); 173 ObjectInput in = null; 174 try { 175 in = new ObjectInputStream(bis); 176 return (M) in.readObject(); 177 } catch (IOException | ClassNotFoundException e) { 178 throw new RuntimeException(e); 179 } finally { 180 try { 181 if (in != null) { 182 in.close(); 183 } 184 } catch (IOException ex) { 185 // ignore close exception 186 } 187 } 188 } 189 190 protected int poll(Duration timeout) throws InterruptedException { 191 records.clear(); 192 try { 193 for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout.toMillis())) { 194 if (log.isDebugEnabled() && records.isEmpty()) { 195 log.debug("Poll first record: " + getNameForTopic(record.topic()) + ":" + record.partition() + ":+" 196 + record.offset()); 197 } 198 records.add(record); 199 } 200 } catch (org.apache.kafka.common.errors.InterruptException e) { 201 // the thread is already interrupted 202 throw new InterruptedException(e.getMessage()); 203 } catch (WakeupException e) { 204 log.debug("Receiving wakeup from another thread to close the tailer"); 205 close(); 206 throw new IllegalStateException("poll interrupted because tailer has been closed"); 207 } 208 if (log.isDebugEnabled()) { 209 String msg = "Polling " + id + " returns " + records.size() + " records"; 210 if (records.size() > 0) { 211 log.debug(msg); 212 } else { 213 log.trace(msg); 214 } 215 } 216 return records.size(); 217 } 218 219 @Override 220 public void toEnd() { 221 log.debug("toEnd: " + id); 222 lastOffsets.clear(); 223 records.clear(); 224 consumer.seekToEnd(Collections.emptyList()); 225 } 226 227 @Override 228 public void toStart() { 229 log.debug("toStart: " + id); 230 lastOffsets.clear(); 231 records.clear(); 232 consumer.seekToBeginning(Collections.emptyList()); 233 } 234 235 @Override 236 public void toLastCommitted() { 237 if (log.isDebugEnabled()) { 238 log.debug("toLastCommitted tailer: " + id); 239 } 240 String msg = consumer.assignment() 241 .stream() 242 .map(tp -> String.format("%s-%02d:+%d", getNameForTopic(tp.topic()), tp.partition(), 243 toLastCommitted(tp))) 244 .collect(Collectors.joining("|")); 245 if (msg.length() > 0) { 246 if (log.isInfoEnabled()) { 247 log.info("toLastCommitted offsets: " + group + ":" + msg); 248 } 249 } 250 lastCommittedOffsets.clear(); 251 lastOffsets.clear(); 252 records.clear(); 253 } 254 255 protected long toLastCommitted(TopicPartition topicPartition) { 256 Long offset = lastCommittedOffsets.get(topicPartition); 257 if (offset == null) { 258 OffsetAndMetadata offsetMeta = consumer.committed(topicPartition); 259 if (offsetMeta != null) { 260 offset = offsetMeta.offset(); 261 } 262 } 263 if (offset != null) { 264 consumer.seek(topicPartition, offset); 265 } else { 266 consumer.seekToBeginning(Collections.singletonList(topicPartition)); 267 offset = consumer.position(topicPartition); 268 } 269 if (log.isDebugEnabled()) { 270 log.debug(String.format(" toLastCommitted: %s-%02d:+%d", getNameForTopic(topicPartition.topic()), 271 topicPartition.partition(), offset)); 272 } 273 return offset; 274 } 275 276 @Override 277 public void seek(LogOffset offset) { 278 log.debug("Seek to: " + offset + " from tailer: " + id); 279 TopicPartition topicPartition = new TopicPartition(prefix + offset.partition().name(), 280 offset.partition().partition()); 281 consumer.seek(topicPartition, offset.offset()); 282 lastOffsets.remove(topicPartition); 283 int partition = topicPartition.partition(); 284 records.removeIf(rec -> rec.partition() == partition); 285 } 286 287 @Override 288 public void reset() { 289 // we just commit the first offset 290 log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id); 291 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions); 292 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 293 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 294 consumer.commitSync(offsetToCommit); 295 lastCommittedOffsets.clear(); 296 toLastCommitted(); 297 } 298 299 @Override 300 public void reset(LogPartition partition) { 301 log.info("Reset committed offset for partition: " + partition + " tailer: " + id); 302 TopicPartition topicPartition = new TopicPartition(prefix + partition.name(), partition.partition()); 303 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition)); 304 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 305 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 306 consumer.commitSync(offsetToCommit); 307 lastCommittedOffsets.remove(topicPartition); 308 seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition))); 309 } 310 311 @Override 312 public void commit() { 313 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 314 lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1))); 315 lastOffsets.clear(); 316 if (offsetToCommit.isEmpty()) { 317 return; 318 } 319 consumer.commitSync(offsetToCommit); 320 offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 321 if (log.isDebugEnabled()) { 322 String msg = offsetToCommit.entrySet() 323 .stream() 324 .map(entry -> String.format("%s-%02d:+%d", 325 getNameForTopic(entry.getKey().topic()), entry.getKey().partition(), 326 entry.getValue().offset())) 327 .collect(Collectors.joining("|")); 328 log.debug("Committed offsets " + group + ":" + msg); 329 } 330 } 331 332 @Override 333 public LogOffset commit(LogPartition partition) { 334 TopicPartition topicPartition = new TopicPartition(prefix + partition.name(), partition.partition()); 335 Long offset = lastOffsets.get(topicPartition); 336 if (offset == null) { 337 if (log.isDebugEnabled()) { 338 log.debug("unchanged partition, nothing to commit: " + partition); 339 } 340 return null; 341 } 342 offset += 1; 343 consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset))); 344 LogOffset ret = new LogOffsetImpl(partition, offset); 345 if (log.isInfoEnabled()) { 346 log.info("Committed: " + offset + "/" + group); 347 } 348 return ret; 349 } 350 351 @Override 352 public Collection<LogPartition> assignments() { 353 return partitions; 354 } 355 356 @Override 357 public String group() { 358 return group; 359 } 360 361 @Override 362 public boolean closed() { 363 return closed; 364 } 365 366 @Override 367 public void close() { 368 if (consumer != null) { 369 log.info("Closing tailer: " + id); 370 try { 371 // calling wakeup enable to terminate consumer blocking on poll call 372 consumer.close(); 373 } catch (ConcurrentModificationException e) { 374 // closing from another thread raise this exception, try to wakeup the owner 375 log.info("Closing tailer from another thread, send wakeup"); 376 consumer.wakeup(); 377 return; 378 } catch (InterruptException | IllegalStateException e) { 379 // this happens if the consumer has already been closed or if it is closed from another 380 // thread. 381 log.warn("Discard error while closing consumer: ", e); 382 } catch (Throwable t) { 383 log.error("interrupted", t); 384 } 385 consumer = null; 386 } 387 closed = true; 388 } 389 390 @Override 391 public String toString() { 392 return "KafkaLogTailer{" + "prefix='" + prefix + '\'' + ", id=" + id + ", closed=" + closed + '}'; 393 } 394 395 @Override 396 public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 397 Collection<LogPartition> revoked = partitions.stream() 398 .map(tp -> LogPartition.of(getNameForTopic(tp.topic()), 399 tp.partition())) 400 .collect(Collectors.toList()); 401 log.info(String.format("Rebalance revoked: %s", revoked)); 402 id += "-revoked"; 403 if (listener != null) { 404 listener.onPartitionsRevoked(revoked); 405 } 406 } 407 408 @Override 409 public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) { 410 partitions = newPartitions.stream() 411 .map(tp -> LogPartition.of(getNameForTopic(tp.topic()), tp.partition())) 412 .collect(Collectors.toList()); 413 topicPartitions = newPartitions; 414 id = buildId(group, partitions); 415 lastCommittedOffsets.clear(); 416 lastOffsets.clear(); 417 records.clear(); 418 isRebalanced = true; 419 log.info(String.format("Rebalance assigned: %s", partitions)); 420 if (listener != null) { 421 listener.onPartitionsAssigned(partitions); 422 } 423 } 424 425}