001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import static java.util.stream.Collectors.toMap; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023 024import java.io.Externalizable; 025import java.time.Duration; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.ConcurrentModificationException; 029import java.util.HashMap; 030import java.util.LinkedList; 031import java.util.Map; 032import java.util.Objects; 033import java.util.Properties; 034import java.util.Queue; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.stream.Collectors; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.apache.kafka.clients.consumer.ConsumerConfig; 041import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 042import org.apache.kafka.clients.consumer.ConsumerRecord; 043import org.apache.kafka.clients.consumer.KafkaConsumer; 044import org.apache.kafka.clients.consumer.OffsetAndMetadata; 045import org.apache.kafka.clients.consumer.OffsetAndTimestamp; 046import org.apache.kafka.common.TopicPartition; 047import org.apache.kafka.common.errors.InterruptException; 048import org.apache.kafka.common.errors.WakeupException; 049import org.apache.kafka.common.utils.Bytes; 050import org.nuxeo.lib.stream.codec.Codec; 051import org.nuxeo.lib.stream.codec.SerializableCodec; 052import org.nuxeo.lib.stream.log.LogOffset; 053import org.nuxeo.lib.stream.log.LogPartition; 054import org.nuxeo.lib.stream.log.LogRecord; 055import org.nuxeo.lib.stream.log.LogTailer; 056import org.nuxeo.lib.stream.log.RebalanceException; 057import org.nuxeo.lib.stream.log.RebalanceListener; 058import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 059 060/** 061 * @since 9.3 062 */ 063public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener { 064 private static final Log log = LogFactory.getLog(KafkaLogTailer.class); 065 066 protected final String group; 067 068 protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>(); 069 070 protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>(); 071 072 protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>(); 073 074 protected final KafkaNamespace ns; 075 076 protected final Codec<M> codec; 077 078 protected final Codec<M> decodeCodec; 079 080 protected KafkaConsumer<String, Bytes> consumer; 081 082 protected String id; 083 084 protected Collection<TopicPartition> topicPartitions; 085 086 protected Collection<LogPartition> partitions; 087 088 // keep track of all tailers on the same namespace index even from different mq 089 protected boolean closed; 090 091 protected Collection<String> names; 092 093 protected RebalanceListener listener; 094 095 protected boolean isRebalanced; 096 097 protected boolean consumerMoved; 098 099 protected static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); 100 101 protected KafkaLogTailer(Codec<M> codec, KafkaNamespace ns, String group, Properties consumerProps) { 102 this.codec = codec; 103 if (NO_CODEC.equals(codec)) { 104 this.decodeCodec = new SerializableCodec<>(); 105 } else { 106 this.decodeCodec = codec; 107 } 108 Objects.requireNonNull(group); 109 this.ns = ns; 110 this.group = group; 111 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group)); 112 consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, group + "-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement()); 113 this.consumer = new KafkaConsumer<>(consumerProps); 114 } 115 116 @SuppressWarnings("squid:S2095") 117 public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(Codec<M> codec, KafkaNamespace ns, 118 Collection<LogPartition> partitions, String group, Properties consumerProps) { 119 KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, ns, group, consumerProps); 120 ret.id = buildId(ret.group, partitions); 121 ret.partitions = partitions; 122 ret.topicPartitions = partitions.stream() 123 .map(partition -> new TopicPartition(ns.getTopicName(partition.name()), 124 partition.partition())) 125 .collect(Collectors.toList()); 126 ret.consumer.assign(ret.topicPartitions); 127 log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, ns)); 128 return ret; 129 } 130 131 @SuppressWarnings("squid:S2095") 132 public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(Codec<M> codec, KafkaNamespace ns, 133 Collection<String> names, String group, Properties consumerProps, RebalanceListener listener) { 134 KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, ns, group, consumerProps); 135 ret.id = buildSubscribeId(ret.group, names); 136 ret.names = names; 137 Collection<String> topics = names.stream().map(ns::getTopicName).collect(Collectors.toList()); 138 ret.listener = listener; 139 ret.consumer.subscribe(topics, ret); 140 ret.partitions = Collections.emptyList(); 141 ret.topicPartitions = Collections.emptyList(); 142 log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, ns)); 143 return ret; 144 } 145 146 protected static String buildId(String group, Collection<LogPartition> partitions) { 147 return group + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|")); 148 } 149 150 protected static String buildSubscribeId(String group, Collection<String> names) { 151 return group + ":" + String.join("|", names); 152 } 153 154 @Override 155 public LogRecord<M> read(Duration timeout) throws InterruptedException { 156 if (closed) { 157 throw new IllegalStateException("The tailer has been closed."); 158 } 159 if (records.isEmpty()) { 160 int items = poll(timeout); 161 if (isRebalanced) { 162 isRebalanced = false; 163 log.debug("Rebalance happens during poll, raising exception"); 164 throw new RebalanceException("Partitions has been rebalanced"); 165 } 166 if (items == 0) { 167 if (log.isTraceEnabled()) { 168 log.trace("No data " + id + " after " + timeout.toMillis() + " ms"); 169 } 170 return null; 171 } 172 } 173 ConsumerRecord<String, Bytes> record = records.poll(); 174 lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); 175 M value = decodeCodec.decode(record.value().get()); 176 LogPartition partition = LogPartition.of(ns.getLogName(record.topic()), record.partition()); 177 LogOffset offset = new LogOffsetImpl(partition, record.offset()); 178 consumerMoved = false; 179 if (log.isDebugEnabled()) { 180 log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value)); 181 } 182 return new LogRecord<>(value, offset); 183 } 184 185 protected int poll(Duration timeout) throws InterruptedException { 186 records.clear(); 187 try { 188 for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout)) { 189 if (log.isDebugEnabled() && records.isEmpty()) { 190 log.debug("Poll first record: " + ns.getLogName(record.topic()) + ":" + record.partition() + ":+" 191 + record.offset()); 192 } 193 records.add(record); 194 } 195 } catch (org.apache.kafka.common.errors.InterruptException e) { 196 // the thread is already interrupted 197 throw new InterruptedException(e.getMessage()); 198 } catch (WakeupException e) { 199 log.debug("Receiving wakeup from another thread to close the tailer"); 200 close(); 201 throw new IllegalStateException("poll interrupted because tailer has been closed"); 202 } 203 if (log.isDebugEnabled()) { 204 String msg = "Polling " + id + " returns " + records.size() + " records"; 205 if (records.isEmpty()) { 206 log.trace(msg); 207 } else { 208 log.debug(msg); 209 } 210 } 211 return records.size(); 212 } 213 214 @Override 215 public void toEnd() { 216 log.debug("toEnd: " + id); 217 lastOffsets.clear(); 218 records.clear(); 219 consumer.seekToEnd(Collections.emptyList()); 220 consumerMoved = true; 221 } 222 223 @Override 224 public void toStart() { 225 log.debug("toStart: " + id); 226 lastOffsets.clear(); 227 records.clear(); 228 consumer.seekToBeginning(Collections.emptyList()); 229 consumerMoved = true; 230 } 231 232 @Override 233 public void toLastCommitted() { 234 if (log.isDebugEnabled()) { 235 log.debug("toLastCommitted tailer: " + id); 236 } 237 String msg = consumer.assignment() 238 .stream() 239 .map(tp -> String.format("%s-%02d:+%d", ns.getLogName(tp.topic()), tp.partition(), 240 toLastCommitted(tp))) 241 .collect(Collectors.joining("|")); 242 if (msg.length() > 0 && log.isInfoEnabled()) { 243 log.info("toLastCommitted offsets: " + group + ":" + msg); 244 } 245 lastOffsets.clear(); 246 records.clear(); 247 consumerMoved = false; 248 } 249 250 protected long toLastCommitted(TopicPartition topicPartition) { 251 Long offset = lastCommittedOffsets.get(topicPartition); 252 if (offset == null) { 253 OffsetAndMetadata offsetMeta = consumer.committed(topicPartition); 254 if (offsetMeta != null) { 255 offset = offsetMeta.offset(); 256 } 257 } 258 if (offset != null) { 259 consumer.seek(topicPartition, offset); 260 } else { 261 consumer.seekToBeginning(Collections.singletonList(topicPartition)); 262 offset = consumer.position(topicPartition); 263 } 264 lastCommittedOffsets.put(topicPartition, offset); 265 if (log.isDebugEnabled()) { 266 log.debug(String.format(" toLastCommitted: %s-%02d:+%d", ns.getLogName(topicPartition.topic()), 267 topicPartition.partition(), offset)); 268 } 269 return offset; 270 } 271 272 @Override 273 public void seek(LogOffset offset) { 274 log.debug("Seek to: " + offset.offset() + " from tailer: " + id); 275 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(offset.partition().name()), 276 offset.partition().partition()); 277 consumer.seek(topicPartition, offset.offset()); 278 lastOffsets.remove(topicPartition); 279 int partition = topicPartition.partition(); 280 records.removeIf(rec -> rec.partition() == partition); 281 consumerMoved = true; 282 } 283 284 @Override 285 public void reset() { 286 // we just commit the first offset 287 log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id); 288 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions); 289 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 290 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 291 consumer.commitSync(offsetToCommit); 292 lastCommittedOffsets.clear(); 293 toLastCommitted(); 294 } 295 296 @Override 297 public void reset(LogPartition partition) { 298 log.info("Reset committed offset for partition: " + partition + " tailer: " + id); 299 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition()); 300 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition)); 301 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 302 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 303 consumer.commitSync(offsetToCommit); 304 lastCommittedOffsets.remove(topicPartition); 305 seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition))); 306 } 307 308 @Override 309 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 310 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition()); 311 Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes( 312 Collections.singletonMap(topicPartition, timestamp)); 313 if (offsetsForTimes.size() == 1) { 314 OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition); 315 if (offsetAndTimestamp != null) { 316 return new LogOffsetImpl(partition, offsetAndTimestamp.offset()); 317 } 318 } 319 return null; 320 } 321 322 @Override 323 public void commit() { 324 if (consumerMoved) { 325 forceCommit(); 326 return; 327 } 328 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 329 lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1))); 330 lastOffsets.clear(); 331 if (offsetToCommit.isEmpty()) { 332 return; 333 } 334 consumer.commitSync(offsetToCommit); 335 offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 336 if (log.isDebugEnabled()) { 337 String msg = offsetToCommit.entrySet() 338 .stream() 339 .map(entry -> String.format("%s-%02d:+%d", ns.getLogName(entry.getKey().topic()), 340 entry.getKey().partition(), entry.getValue().offset())) 341 .collect(Collectors.joining("|")); 342 log.debug("Committed offsets " + group + ":" + msg); 343 } 344 } 345 346 /** 347 * Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsets 348 */ 349 protected void forceCommit() { 350 log.info("Force commit after a move"); 351 352 Map<TopicPartition, OffsetAndMetadata> offsets = topicPartitions.stream() 353 .collect(toMap(tp -> tp, 354 tp -> new OffsetAndMetadata( 355 consumer.position(tp)))); 356 consumer.commitSync(offsets); 357 offsets.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 358 consumerMoved = false; 359 lastOffsets.clear(); 360 } 361 362 @Override 363 public LogOffset commit(LogPartition partition) { 364 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition()); 365 Long offset = lastOffsets.get(topicPartition); 366 if (offset == null) { 367 if (log.isDebugEnabled()) { 368 log.debug("unchanged partition, nothing to commit: " + partition); 369 } 370 return null; 371 } 372 offset += 1; 373 consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset))); 374 LogOffset ret = new LogOffsetImpl(partition, offset); 375 if (log.isInfoEnabled()) { 376 log.info("Committed: " + offset + "/" + group); 377 } 378 return ret; 379 } 380 381 @Override 382 public Collection<LogPartition> assignments() { 383 return partitions; 384 } 385 386 @Override 387 public String group() { 388 return group; 389 } 390 391 @Override 392 public boolean closed() { 393 return closed; 394 } 395 396 @Override 397 public Codec<M> getCodec() { 398 return codec; 399 } 400 401 @SuppressWarnings("squid:S1181") 402 @Override 403 public void close() { 404 if (consumer != null) { 405 log.debug("Closing tailer: " + id); 406 try { 407 // calling wakeup enable to terminate consumer blocking on poll call 408 consumer.close(); 409 } catch (ConcurrentModificationException e) { 410 // closing from another thread raise this exception, try to wakeup the owner 411 log.info("Closing tailer from another thread, send wakeup"); 412 consumer.wakeup(); 413 return; 414 } catch (InterruptException | IllegalStateException e) { 415 // this happens if the consumer has already been closed or if it is closed from another 416 // thread. 417 log.warn("Discard error while closing consumer: ", e); 418 } catch (Throwable t) { 419 log.error("interrupted", t); 420 } 421 consumer = null; 422 } 423 closed = true; 424 } 425 426 @Override 427 public String toString() { 428 return "KafkaLogTailer{" + "ns='" + ns + '\'' + ", id=" + id + ", closed=" + closed + ", codec=" + codec + '}'; 429 } 430 431 @Override 432 public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 433 Collection<LogPartition> revoked = partitions.stream() 434 .map(tp -> LogPartition.of(ns.getLogName(tp.topic()), 435 tp.partition())) 436 .collect(Collectors.toList()); 437 log.info(String.format("Rebalance revoked: %s", revoked)); 438 id += "-revoked"; 439 if (listener != null) { 440 listener.onPartitionsRevoked(revoked); 441 } 442 } 443 444 @Override 445 public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) { 446 partitions = newPartitions.stream() 447 .map(tp -> LogPartition.of(ns.getLogName(tp.topic()), tp.partition())) 448 .collect(Collectors.toList()); 449 topicPartitions = newPartitions; 450 id = buildId(group, partitions); 451 lastCommittedOffsets.clear(); 452 lastOffsets.clear(); 453 records.clear(); 454 isRebalanced = true; 455 log.info(String.format("Rebalance assigned: %s", partitions)); 456 if (listener != null) { 457 listener.onPartitionsAssigned(partitions); 458 } 459 } 460 461}