001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import static java.util.stream.Collectors.toMap; 022 023import java.io.ByteArrayInputStream; 024import java.io.Externalizable; 025import java.io.IOException; 026import java.io.ObjectInput; 027import java.io.ObjectInputStream; 028import java.time.Duration; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.ConcurrentModificationException; 032import java.util.HashMap; 033import java.util.LinkedList; 034import java.util.Map; 035import java.util.Objects; 036import java.util.Properties; 037import java.util.Queue; 038import java.util.stream.Collectors; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.apache.kafka.clients.consumer.ConsumerConfig; 043import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 044import org.apache.kafka.clients.consumer.ConsumerRecord; 045import org.apache.kafka.clients.consumer.KafkaConsumer; 046import org.apache.kafka.clients.consumer.OffsetAndMetadata; 047import org.apache.kafka.clients.consumer.OffsetAndTimestamp; 048import org.apache.kafka.common.TopicPartition; 049import org.apache.kafka.common.errors.InterruptException; 050import org.apache.kafka.common.errors.WakeupException; 051import org.apache.kafka.common.utils.Bytes; 052import org.nuxeo.lib.stream.log.LogOffset; 053import org.nuxeo.lib.stream.log.LogPartition; 054import org.nuxeo.lib.stream.log.LogRecord; 055import org.nuxeo.lib.stream.log.LogTailer; 056import org.nuxeo.lib.stream.log.RebalanceException; 057import org.nuxeo.lib.stream.log.RebalanceListener; 058import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 059 060/** 061 * @since 9.3 062 */ 063public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener { 064 private static final Log log = LogFactory.getLog(KafkaLogTailer.class); 065 066 protected final String group; 067 068 protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>(); 069 070 protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>(); 071 072 protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>(); 073 074 protected final KafkaNamespace ns; 075 076 protected KafkaConsumer<String, Bytes> consumer; 077 078 protected String id; 079 080 protected Collection<TopicPartition> topicPartitions; 081 082 protected Collection<LogPartition> partitions; 083 084 // keep track of all tailers on the same namespace index even from different mq 085 protected boolean closed; 086 087 protected Collection<String> names; 088 089 protected RebalanceListener listener; 090 091 protected boolean isRebalanced; 092 093 protected boolean consumerMoved; 094 095 protected KafkaLogTailer(KafkaNamespace ns, String group, Properties consumerProps) { 096 Objects.requireNonNull(group); 097 this.ns = ns; 098 this.group = group; 099 // this.prefixedGroup = group; 100 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group)); 101 this.consumer = new KafkaConsumer<>(consumerProps); 102 103 } 104 105 public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(KafkaNamespace ns, 106 Collection<LogPartition> partitions, String group, Properties consumerProps) { 107 KafkaLogTailer<M> ret = new KafkaLogTailer<>(ns, group, consumerProps); 108 ret.id = buildId(ret.group, partitions); 109 ret.partitions = partitions; 110 ret.topicPartitions = partitions.stream() 111 .map(partition -> new TopicPartition(ns.getTopicName(partition.name()), 112 partition.partition())) 113 .collect(Collectors.toList()); 114 ret.consumer.assign(ret.topicPartitions); 115 log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, ns)); 116 return ret; 117 } 118 119 public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(KafkaNamespace ns, 120 Collection<String> names, String group, Properties consumerProps, RebalanceListener listener) { 121 KafkaLogTailer<M> ret = new KafkaLogTailer<>(ns, group, consumerProps); 122 ret.id = buildSubscribeId(ret.group, names); 123 ret.names = names; 124 Collection<String> topics = names.stream().map(ns::getTopicName).collect(Collectors.toList()); 125 ret.listener = listener; 126 ret.consumer.subscribe(topics, ret); 127 ret.partitions = Collections.emptyList(); 128 ret.topicPartitions = Collections.emptyList(); 129 log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, ns)); 130 return ret; 131 } 132 133 protected static String buildId(String group, Collection<LogPartition> partitions) { 134 return group + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|")); 135 } 136 137 protected static String buildSubscribeId(String group, Collection<String> names) { 138 return group + ":" + names.stream().collect(Collectors.joining("|")); 139 } 140 141 @Override 142 public LogRecord<M> read(Duration timeout) throws InterruptedException { 143 if (closed) { 144 throw new IllegalStateException("The tailer has been closed."); 145 } 146 if (records.isEmpty()) { 147 int items = poll(timeout); 148 if (isRebalanced) { 149 isRebalanced = false; 150 log.debug("Rebalance happens during poll, raising exception"); 151 throw new RebalanceException("Partitions has been rebalanced"); 152 } 153 if (items == 0) { 154 if (log.isTraceEnabled()) { 155 log.trace("No data " + id + " after " + timeout.toMillis() + " ms"); 156 } 157 return null; 158 } 159 } 160 ConsumerRecord<String, Bytes> record = records.poll(); 161 lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); 162 M value = messageOf(record.value()); 163 LogPartition partition = LogPartition.of(ns.getLogName(record.topic()), record.partition()); 164 LogOffset offset = new LogOffsetImpl(partition, record.offset()); 165 consumerMoved = false; 166 if (log.isDebugEnabled()) { 167 log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value)); 168 } 169 return new LogRecord<>(value, offset); 170 } 171 172 @SuppressWarnings("unchecked") 173 protected M messageOf(Bytes value) { 174 // TODO: switch to commons-lang3 SerializationUtils 175 ByteArrayInputStream bis = new ByteArrayInputStream(value.get()); 176 ObjectInput in = null; 177 try { 178 in = new ObjectInputStream(bis); 179 return (M) in.readObject(); 180 } catch (IOException | ClassNotFoundException e) { 181 throw new RuntimeException(e); 182 } finally { 183 try { 184 if (in != null) { 185 in.close(); 186 } 187 } catch (IOException ex) { 188 // ignore close exception 189 } 190 } 191 } 192 193 protected int poll(Duration timeout) throws InterruptedException { 194 records.clear(); 195 try { 196 for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout.toMillis())) { 197 if (log.isDebugEnabled() && records.isEmpty()) { 198 log.debug("Poll first record: " + ns.getLogName(record.topic()) + ":" + record.partition() + ":+" 199 + record.offset()); 200 } 201 records.add(record); 202 } 203 } catch (org.apache.kafka.common.errors.InterruptException e) { 204 // the thread is already interrupted 205 throw new InterruptedException(e.getMessage()); 206 } catch (WakeupException e) { 207 log.debug("Receiving wakeup from another thread to close the tailer"); 208 close(); 209 throw new IllegalStateException("poll interrupted because tailer has been closed"); 210 } 211 if (log.isDebugEnabled()) { 212 String msg = "Polling " + id + " returns " + records.size() + " records"; 213 if (records.size() > 0) { 214 log.debug(msg); 215 } else { 216 log.trace(msg); 217 } 218 } 219 return records.size(); 220 } 221 222 @Override 223 public void toEnd() { 224 log.debug("toEnd: " + id); 225 lastOffsets.clear(); 226 records.clear(); 227 consumer.seekToEnd(Collections.emptyList()); 228 consumerMoved = true; 229 } 230 231 @Override 232 public void toStart() { 233 log.debug("toStart: " + id); 234 lastOffsets.clear(); 235 records.clear(); 236 consumer.seekToBeginning(Collections.emptyList()); 237 consumerMoved = true; 238 } 239 240 @Override 241 public void toLastCommitted() { 242 if (log.isDebugEnabled()) { 243 log.debug("toLastCommitted tailer: " + id); 244 } 245 String msg = consumer.assignment() 246 .stream() 247 .map(tp -> String.format("%s-%02d:+%d", ns.getLogName(tp.topic()), tp.partition(), 248 toLastCommitted(tp))) 249 .collect(Collectors.joining("|")); 250 if (msg.length() > 0) { 251 if (log.isInfoEnabled()) { 252 log.info("toLastCommitted offsets: " + group + ":" + msg); 253 } 254 } 255 lastOffsets.clear(); 256 records.clear(); 257 consumerMoved = false; 258 } 259 260 protected long toLastCommitted(TopicPartition topicPartition) { 261 Long offset = lastCommittedOffsets.get(topicPartition); 262 if (offset == null) { 263 OffsetAndMetadata offsetMeta = consumer.committed(topicPartition); 264 if (offsetMeta != null) { 265 offset = offsetMeta.offset(); 266 } 267 } 268 if (offset != null) { 269 consumer.seek(topicPartition, offset); 270 } else { 271 consumer.seekToBeginning(Collections.singletonList(topicPartition)); 272 offset = consumer.position(topicPartition); 273 } 274 lastCommittedOffsets.put(topicPartition,offset); 275 if (log.isDebugEnabled()) { 276 log.debug(String.format(" toLastCommitted: %s-%02d:+%d", ns.getLogName(topicPartition.topic()), 277 topicPartition.partition(), offset)); 278 } 279 return offset; 280 } 281 282 @Override 283 public void seek(LogOffset offset) { 284 log.debug("Seek to: " + offset.offset() + " from tailer: " + id); 285 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(offset.partition().name()), 286 offset.partition().partition()); 287 consumer.seek(topicPartition, offset.offset()); 288 lastOffsets.remove(topicPartition); 289 int partition = topicPartition.partition(); 290 records.removeIf(rec -> rec.partition() == partition); 291 consumerMoved = true; 292 } 293 294 @Override 295 public void reset() { 296 // we just commit the first offset 297 log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id); 298 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions); 299 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 300 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 301 consumer.commitSync(offsetToCommit); 302 lastCommittedOffsets.clear(); 303 toLastCommitted(); 304 } 305 306 @Override 307 public void reset(LogPartition partition) { 308 log.info("Reset committed offset for partition: " + partition + " tailer: " + id); 309 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition()); 310 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition)); 311 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 312 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 313 consumer.commitSync(offsetToCommit); 314 lastCommittedOffsets.remove(topicPartition); 315 seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition))); 316 } 317 318 @Override 319 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 320 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition()); 321 Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes( 322 Collections.singletonMap(topicPartition, timestamp)); 323 if (offsetsForTimes.size() == 1) { 324 OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition); 325 if (offsetAndTimestamp != null) { 326 return new LogOffsetImpl(partition, offsetAndTimestamp.offset()); 327 } 328 } 329 return null; 330 } 331 332 @Override 333 public void commit() { 334 if (consumerMoved) { 335 forceCommit(); 336 return; 337 } 338 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 339 lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1))); 340 lastOffsets.clear(); 341 if (offsetToCommit.isEmpty()) { 342 return; 343 } 344 consumer.commitSync(offsetToCommit); 345 offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 346 if (log.isDebugEnabled()) { 347 String msg = offsetToCommit.entrySet() 348 .stream() 349 .map(entry -> String.format("%s-%02d:+%d", 350 ns.getLogName(entry.getKey().topic()), entry.getKey().partition(), 351 entry.getValue().offset())) 352 .collect(Collectors.joining("|")); 353 log.debug("Committed offsets " + group + ":" + msg); 354 } 355 } 356 357 /** 358 * Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsets 359 */ 360 protected void forceCommit() { 361 log.info("Force commit after a move"); 362 363 Map<TopicPartition, OffsetAndMetadata> offsets = 364 topicPartitions.stream().collect(toMap(tp -> tp, tp -> new OffsetAndMetadata(consumer.position(tp)))); 365 consumer.commitSync(offsets); 366 offsets.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 367 consumerMoved = false; 368 lastOffsets.clear(); 369 } 370 371 @Override 372 public LogOffset commit(LogPartition partition) { 373 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition()); 374 Long offset = lastOffsets.get(topicPartition); 375 if (offset == null) { 376 if (log.isDebugEnabled()) { 377 log.debug("unchanged partition, nothing to commit: " + partition); 378 } 379 return null; 380 } 381 offset += 1; 382 consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset))); 383 LogOffset ret = new LogOffsetImpl(partition, offset); 384 if (log.isInfoEnabled()) { 385 log.info("Committed: " + offset + "/" + group); 386 } 387 return ret; 388 } 389 390 @Override 391 public Collection<LogPartition> assignments() { 392 return partitions; 393 } 394 395 @Override 396 public String group() { 397 return group; 398 } 399 400 @Override 401 public boolean closed() { 402 return closed; 403 } 404 405 @Override 406 public void close() { 407 if (consumer != null) { 408 log.info("Closing tailer: " + id); 409 try { 410 // calling wakeup enable to terminate consumer blocking on poll call 411 consumer.close(); 412 } catch (ConcurrentModificationException e) { 413 // closing from another thread raise this exception, try to wakeup the owner 414 log.info("Closing tailer from another thread, send wakeup"); 415 consumer.wakeup(); 416 return; 417 } catch (InterruptException | IllegalStateException e) { 418 // this happens if the consumer has already been closed or if it is closed from another 419 // thread. 420 log.warn("Discard error while closing consumer: ", e); 421 } catch (Throwable t) { 422 log.error("interrupted", t); 423 } 424 consumer = null; 425 } 426 closed = true; 427 } 428 429 @Override 430 public String toString() { 431 return "KafkaLogTailer{" + "ns='" + ns + '\'' + ", id=" + id + ", closed=" + closed + '}'; 432 } 433 434 @Override 435 public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 436 Collection<LogPartition> revoked = partitions.stream() 437 .map(tp -> LogPartition.of(ns.getLogName(tp.topic()), 438 tp.partition())) 439 .collect(Collectors.toList()); 440 log.info(String.format("Rebalance revoked: %s", revoked)); 441 id += "-revoked"; 442 if (listener != null) { 443 listener.onPartitionsRevoked(revoked); 444 } 445 } 446 447 @Override 448 public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) { 449 partitions = newPartitions.stream() 450 .map(tp -> LogPartition.of(ns.getLogName(tp.topic()), tp.partition())) 451 .collect(Collectors.toList()); 452 topicPartitions = newPartitions; 453 id = buildId(group, partitions); 454 lastCommittedOffsets.clear(); 455 lastOffsets.clear(); 456 records.clear(); 457 isRebalanced = true; 458 log.info(String.format("Rebalance assigned: %s", partitions)); 459 if (listener != null) { 460 listener.onPartitionsAssigned(partitions); 461 } 462 } 463 464}