001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import static java.util.stream.Collectors.toMap; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023 024import java.io.Externalizable; 025import java.time.Duration; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.ConcurrentModificationException; 029import java.util.HashMap; 030import java.util.LinkedList; 031import java.util.Map; 032import java.util.Objects; 033import java.util.Properties; 034import java.util.Queue; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.stream.Collectors; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.apache.kafka.clients.consumer.CommitFailedException; 041import org.apache.kafka.clients.consumer.ConsumerConfig; 042import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 043import org.apache.kafka.clients.consumer.ConsumerRecord; 044import org.apache.kafka.clients.consumer.KafkaConsumer; 045import org.apache.kafka.clients.consumer.OffsetAndMetadata; 046import org.apache.kafka.clients.consumer.OffsetAndTimestamp; 047import org.apache.kafka.common.TopicPartition; 048import org.apache.kafka.common.errors.InterruptException; 049import org.apache.kafka.common.errors.RebalanceInProgressException; 050import org.apache.kafka.common.errors.WakeupException; 051import org.apache.kafka.common.utils.Bytes; 052import org.nuxeo.lib.stream.codec.Codec; 053import org.nuxeo.lib.stream.codec.SerializableCodec; 054import org.nuxeo.lib.stream.log.LogOffset; 055import org.nuxeo.lib.stream.log.LogPartition; 056import org.nuxeo.lib.stream.log.LogRecord; 057import org.nuxeo.lib.stream.log.LogTailer; 058import org.nuxeo.lib.stream.log.Name; 059import org.nuxeo.lib.stream.log.NameResolver; 060import org.nuxeo.lib.stream.log.RebalanceException; 061import org.nuxeo.lib.stream.log.RebalanceListener; 062import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 063 064/** 065 * @since 9.3 066 */ 067public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener { 068 private static final Log log = LogFactory.getLog(KafkaLogTailer.class); 069 070 protected final Name group; 071 072 protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>(); 073 074 protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>(); 075 076 protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>(); 077 078 protected final Codec<M> codec; 079 080 protected final Codec<M> decodeCodec; 081 082 protected final NameResolver resolver; 083 084 protected KafkaConsumer<String, Bytes> consumer; 085 086 protected String id; 087 088 protected Collection<TopicPartition> topicPartitions; 089 090 protected Collection<LogPartition> partitions; 091 092 // keep track of all tailers on the same namespace index even from different mq 093 protected boolean closed; 094 095 protected Collection<Name> names; 096 097 protected RebalanceListener listener; 098 099 protected boolean isRebalanced; 100 101 protected boolean isRevoked; 102 103 protected boolean isLost; 104 105 protected boolean consumerMoved; 106 107 protected static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); 108 109 protected long lastPollTimestamp; 110 111 protected int lastPollSize; 112 113 protected KafkaLogTailer(Codec<M> codec, NameResolver resolver, Name group, Properties consumerProps) { 114 this.codec = codec; 115 this.resolver = resolver; 116 if (NO_CODEC.equals(codec)) { 117 this.decodeCodec = new SerializableCodec<>(); 118 } else { 119 this.decodeCodec = codec; 120 } 121 Objects.requireNonNull(group); 122 this.group = group; 123 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, resolver.getId(group)); 124 consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, 125 resolver.getId(group) + "-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement()); 126 this.consumer = new KafkaConsumer<>(consumerProps); 127 } 128 129 @SuppressWarnings("squid:S2095") 130 public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(Codec<M> codec, 131 NameResolver resolver, Collection<LogPartition> partitions, Name group, Properties consumerProps) { 132 KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, resolver, group, consumerProps); 133 ret.id = buildId(ret.group, partitions); 134 ret.partitions = partitions; 135 ret.topicPartitions = partitions.stream() 136 .map(partition -> new TopicPartition(resolver.getId(partition.name()), 137 partition.partition())) 138 .collect(Collectors.toList()); 139 ret.consumer.assign(ret.topicPartitions); 140 log.debug(String.format("Created tailer with assignments: %s", ret.id)); 141 return ret; 142 } 143 144 @SuppressWarnings("squid:S2095") 145 public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(Codec<M> codec, 146 NameResolver resolver, Collection<Name> names, Name group, Properties consumerProps, 147 RebalanceListener listener) { 148 KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, resolver, group, consumerProps); 149 ret.id = buildSubscribeId(ret.group, names); 150 ret.names = names; 151 Collection<String> topics = names.stream().map(resolver::getId).collect(Collectors.toList()); 152 ret.listener = listener; 153 ret.consumer.subscribe(topics, ret); 154 ret.partitions = Collections.emptyList(); 155 ret.topicPartitions = Collections.emptyList(); 156 log.debug(String.format("Created tailer with subscription: %s", ret.id)); 157 return ret; 158 } 159 160 protected static String buildId(Name group, Collection<LogPartition> partitions) { 161 return group.getId() + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|")); 162 } 163 164 protected static String buildSubscribeId(Name group, Collection<Name> names) { 165 return group.getId() + ":" 166 + names.stream().map(Name::getId).collect(Collectors.joining("|")); 167 } 168 169 @Override 170 public LogRecord<M> read(Duration timeout) throws InterruptedException { 171 if (closed) { 172 throw new IllegalStateException("The tailer has been closed."); 173 } 174 if (records.isEmpty()) { 175 int items = poll(timeout); 176 if (isRebalanced || isRevoked || isLost) { 177 if (isRebalanced) { 178 log.debug("Rebalance happens during poll, raising exception"); 179 isRebalanced = false; 180 } else { 181 log.warn("Incomplete rebalance during poll, raising exception, revoked: " + isRevoked + ", lost: " 182 + isLost); 183 isRevoked = isLost = false; 184 } 185 throw new RebalanceException("Partitions has been rebalanced"); 186 } 187 if (items == 0) { 188 if (log.isTraceEnabled()) { 189 log.trace("No data " + id + " after " + timeout.toMillis() + " ms"); 190 } 191 return null; 192 } 193 } 194 ConsumerRecord<String, Bytes> record = records.poll(); 195 lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); 196 M value = decodeCodec.decode(record.value().get()); 197 LogPartition partition = LogPartition.of(resolver.getName(record.topic()), record.partition()); 198 LogOffset offset = new LogOffsetImpl(partition, record.offset()); 199 consumerMoved = false; 200 if (log.isDebugEnabled()) { 201 log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value)); 202 } 203 return new LogRecord<>(value, offset); 204 } 205 206 protected int poll(Duration timeout) throws InterruptedException { 207 records.clear(); 208 try { 209 lastPollTimestamp = System.currentTimeMillis(); 210 for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout)) { 211 if (log.isDebugEnabled() && records.isEmpty()) { 212 log.debug("Poll first record: " + resolver.getName(record.topic()).getUrn() + ":" 213 + record.partition() + ":+" 214 + record.offset()); 215 } 216 records.add(record); 217 } 218 } catch (org.apache.kafka.common.errors.InterruptException e) { 219 // the thread is already interrupted 220 throw new InterruptedException(e.getMessage()); 221 } catch (WakeupException e) { 222 log.debug("Receiving wakeup from another thread to close the tailer"); 223 close(); 224 throw new IllegalStateException("poll interrupted because tailer has been closed"); 225 } 226 if (log.isDebugEnabled()) { 227 String msg = "Polling " + id + " returns " + records.size() + " records"; 228 if (records.isEmpty()) { 229 log.trace(msg); 230 } else { 231 log.debug(msg); 232 } 233 } 234 lastPollSize = records.size(); 235 return records.size(); 236 } 237 238 @Override 239 public void toEnd() { 240 log.debug("toEnd: " + id); 241 lastOffsets.clear(); 242 records.clear(); 243 consumer.seekToEnd(Collections.emptyList()); 244 consumerMoved = true; 245 } 246 247 @Override 248 public void toStart() { 249 log.debug("toStart: " + id); 250 lastOffsets.clear(); 251 records.clear(); 252 consumer.seekToBeginning(Collections.emptyList()); 253 consumerMoved = true; 254 } 255 256 @Override 257 public void toLastCommitted() { 258 if (log.isDebugEnabled()) { 259 log.debug("toLastCommitted tailer: " + id); 260 } 261 String msg = consumer.assignment() 262 .stream() 263 .map(tp -> String.format("%s-%02d:+%d", resolver.getName(tp.topic()).getUrn(), 264 tp.partition(), 265 toLastCommitted(tp))) 266 .collect(Collectors.joining("|")); 267 if (msg.length() > 0 && log.isInfoEnabled()) { 268 log.info("toLastCommitted offsets: " + group + ":" + msg); 269 } 270 lastOffsets.clear(); 271 records.clear(); 272 consumerMoved = false; 273 } 274 275 protected long toLastCommitted(TopicPartition topicPartition) { 276 Long offset = lastCommittedOffsets.get(topicPartition); 277 if (offset == null) { 278 Map<TopicPartition, OffsetAndMetadata> offsetMeta = consumer.committed( 279 Collections.singleton(topicPartition)); 280 if (offsetMeta != null && offsetMeta.get(topicPartition) != null) { 281 offset = offsetMeta.get(topicPartition).offset(); 282 } 283 } 284 if (offset != null) { 285 consumer.seek(topicPartition, offset); 286 } else { 287 consumer.seekToBeginning(Collections.singletonList(topicPartition)); 288 offset = consumer.position(topicPartition); 289 } 290 lastCommittedOffsets.put(topicPartition, offset); 291 if (log.isDebugEnabled()) { 292 log.debug(String.format(" toLastCommitted: %s-%02d:+%d", resolver.getName(topicPartition.topic()).getUrn(), 293 topicPartition.partition(), offset)); 294 } 295 return offset; 296 } 297 298 @Override 299 public void seek(LogOffset offset) { 300 log.debug("Seek to: " + offset.offset() + " from tailer: " + id); 301 TopicPartition topicPartition = new TopicPartition(resolver.getId(offset.partition().name()), 302 offset.partition().partition()); 303 consumer.seek(topicPartition, offset.offset()); 304 lastOffsets.remove(topicPartition); 305 int partition = topicPartition.partition(); 306 records.removeIf(rec -> rec.partition() == partition); 307 consumerMoved = true; 308 } 309 310 @Override 311 public void reset() { 312 // we just commit the first offset 313 log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id); 314 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions); 315 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 316 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 317 consumer.commitSync(offsetToCommit); 318 lastCommittedOffsets.clear(); 319 toLastCommitted(); 320 } 321 322 @Override 323 public void reset(LogPartition partition) { 324 log.info("Reset committed offset for partition: " + partition + " tailer: " + id); 325 TopicPartition topicPartition = new TopicPartition(resolver.getId(partition.name()), partition.partition()); 326 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition)); 327 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 328 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 329 consumer.commitSync(offsetToCommit); 330 lastCommittedOffsets.remove(topicPartition); 331 seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition))); 332 } 333 334 @Override 335 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 336 TopicPartition topicPartition = new TopicPartition(resolver.getId(partition.name()), partition.partition()); 337 Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes( 338 Collections.singletonMap(topicPartition, timestamp)); 339 if (offsetsForTimes.size() == 1) { 340 OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition); 341 if (offsetAndTimestamp != null) { 342 return new LogOffsetImpl(partition, offsetAndTimestamp.offset()); 343 } 344 } 345 return null; 346 } 347 348 @Override 349 public void commit() { 350 if (consumerMoved) { 351 forceCommit(); 352 return; 353 } 354 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 355 lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1))); 356 lastOffsets.clear(); 357 if (offsetToCommit.isEmpty()) { 358 return; 359 } 360 try { 361 consumer.commitSync(offsetToCommit); 362 } catch (CommitFailedException | RebalanceInProgressException e) { 363 log.error("Fail to commit " + offsetToCommit + " assigned " + assignments() + " last committed: " 364 + lastCommittedOffsets + " last size: " + lastPollSize + " elapsed: " 365 + (System.currentTimeMillis() - lastPollTimestamp) + " ms, records polled: " + records.size()); 366 throw e; 367 } 368 offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 369 if (log.isDebugEnabled()) { 370 String msg = offsetToCommit.entrySet() 371 .stream() 372 .map(entry -> String.format("%s-%02d:+%d", 373 resolver.getName(entry.getKey().topic()).getUrn(), 374 entry.getKey().partition(), entry.getValue().offset())) 375 .collect(Collectors.joining("|")); 376 log.debug("Committed offsets " + group + ":" + msg); 377 } 378 } 379 380 /** 381 * Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsets 382 */ 383 protected void forceCommit() { 384 log.info("Force commit after a move"); 385 386 Map<TopicPartition, OffsetAndMetadata> offsets = topicPartitions.stream() 387 .collect(toMap(tp -> tp, 388 tp -> new OffsetAndMetadata( 389 consumer.position(tp)))); 390 consumer.commitSync(offsets); 391 offsets.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 392 consumerMoved = false; 393 lastOffsets.clear(); 394 } 395 396 @Override 397 public LogOffset commit(LogPartition partition) { 398 TopicPartition topicPartition = new TopicPartition(resolver.getId(partition.name()), partition.partition()); 399 Long offset = lastOffsets.get(topicPartition); 400 if (offset == null) { 401 if (log.isDebugEnabled()) { 402 log.debug("unchanged partition, nothing to commit: " + partition); 403 } 404 return null; 405 } 406 offset += 1; 407 consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset))); 408 LogOffset ret = new LogOffsetImpl(partition, offset); 409 if (log.isInfoEnabled()) { 410 log.info("Committed: " + offset + "/" + group); 411 } 412 return ret; 413 } 414 415 @Override 416 public Collection<LogPartition> assignments() { 417 return partitions; 418 } 419 420 @Override 421 public Name group() { 422 return group; 423 } 424 425 @Override 426 public boolean closed() { 427 return closed; 428 } 429 430 @Override 431 public Codec<M> getCodec() { 432 return codec; 433 } 434 435 @SuppressWarnings("squid:S1181") 436 @Override 437 public void close() { 438 if (consumer != null) { 439 log.debug("Closing tailer: " + id); 440 try { 441 // calling wakeup enable to terminate consumer blocking on poll call 442 consumer.close(); 443 } catch (ConcurrentModificationException e) { 444 // closing from another thread raise this exception, try to wakeup the owner 445 log.info("Closing tailer from another thread, send wakeup"); 446 consumer.wakeup(); 447 return; 448 } catch (InterruptException | IllegalStateException e) { 449 // this happens if the consumer has already been closed or if it is closed from another 450 // thread. 451 log.info("Discard error while closing consumer: ", e); 452 } catch (Throwable t) { 453 log.error("interrupted", t); 454 } 455 consumer = null; 456 } 457 closed = true; 458 } 459 460 @Override 461 public String toString() { 462 return "KafkaLogTailer{" + "id=" + id + ", closed=" + closed + ", codec=" + codec + '}'; 463 } 464 465 @Override 466 public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 467 Collection<LogPartition> revoked = partitions.stream() 468 .map(tp -> LogPartition.of(resolver.getName(tp.topic()), 469 tp.partition())) 470 .collect(Collectors.toList()); 471 log.info(String.format("Rebalance revoked: %s", revoked)); 472 cleanDuringRebalancing(); 473 isRevoked = true; 474 id += "-revoked"; 475 if (listener != null) { 476 listener.onPartitionsRevoked(revoked); 477 } 478 } 479 480 @Override 481 public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) { 482 partitions = newPartitions.stream() 483 .map(tp -> LogPartition.of(resolver.getName(tp.topic()), tp.partition())) 484 .collect(Collectors.toList()); 485 topicPartitions = newPartitions; 486 id = buildId(group, partitions); 487 cleanDuringRebalancing(); 488 isRebalanced = true; 489 log.info(String.format("Rebalance assigned: %s", partitions)); 490 if (listener != null) { 491 listener.onPartitionsAssigned(partitions); 492 } 493 } 494 495 @Override 496 public void onPartitionsLost(Collection<TopicPartition> partitions) { 497 Collection<LogPartition> lost = partitions.stream() 498 .map(tp -> LogPartition.of(resolver.getName(tp.topic()), 499 tp.partition())) 500 .collect(Collectors.toList()); 501 log.warn(String.format("Rebalance Partition Lost: %s", lost)); 502 id += "-lost"; 503 cleanDuringRebalancing(); 504 isLost = true; 505 if (listener != null) { 506 listener.onPartitionsLost(lost); 507 } 508 } 509 510 protected void cleanDuringRebalancing() { 511 lastCommittedOffsets.clear(); 512 lastOffsets.clear(); 513 records.clear(); 514 isRebalanced = isRevoked = isLost = false; 515 } 516}