001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.kafka; 020 021import static java.util.stream.Collectors.toMap; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023 024import java.io.Externalizable; 025import java.time.Duration; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.ConcurrentModificationException; 029import java.util.HashMap; 030import java.util.LinkedList; 031import java.util.Map; 032import java.util.Objects; 033import java.util.Properties; 034import java.util.Queue; 035import java.util.stream.Collectors; 036 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.apache.kafka.clients.consumer.ConsumerConfig; 040import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 041import org.apache.kafka.clients.consumer.ConsumerRecord; 042import org.apache.kafka.clients.consumer.KafkaConsumer; 043import org.apache.kafka.clients.consumer.OffsetAndMetadata; 044import org.apache.kafka.clients.consumer.OffsetAndTimestamp; 045import org.apache.kafka.common.TopicPartition; 046import org.apache.kafka.common.errors.InterruptException; 047import org.apache.kafka.common.errors.WakeupException; 048import org.apache.kafka.common.utils.Bytes; 049import org.nuxeo.lib.stream.codec.Codec; 050import org.nuxeo.lib.stream.codec.SerializableCodec; 051import org.nuxeo.lib.stream.log.LogOffset; 052import org.nuxeo.lib.stream.log.LogPartition; 053import org.nuxeo.lib.stream.log.LogRecord; 054import org.nuxeo.lib.stream.log.LogTailer; 055import org.nuxeo.lib.stream.log.RebalanceException; 056import org.nuxeo.lib.stream.log.RebalanceListener; 057import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 058 059/** 060 * @since 9.3 061 */ 062public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener { 063 private static final Log log = LogFactory.getLog(KafkaLogTailer.class); 064 065 protected final String group; 066 067 protected final Map<TopicPartition, Long> lastOffsets = new HashMap<>(); 068 069 protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>(); 070 071 protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>(); 072 073 protected final KafkaNamespace ns; 074 075 protected final Codec<M> codec; 076 077 protected final Codec<M> decodeCodec; 078 079 protected KafkaConsumer<String, Bytes> consumer; 080 081 protected String id; 082 083 protected Collection<TopicPartition> topicPartitions; 084 085 protected Collection<LogPartition> partitions; 086 087 // keep track of all tailers on the same namespace index even from different mq 088 protected boolean closed; 089 090 protected Collection<String> names; 091 092 protected RebalanceListener listener; 093 094 protected boolean isRebalanced; 095 096 protected boolean consumerMoved; 097 098 protected KafkaLogTailer(Codec<M> codec, KafkaNamespace ns, String group, Properties consumerProps) { 099 this.codec = codec; 100 if (NO_CODEC.equals(codec)) { 101 this.decodeCodec = new SerializableCodec<>(); 102 } else { 103 this.decodeCodec = codec; 104 } 105 Objects.requireNonNull(group); 106 this.ns = ns; 107 this.group = group; 108 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, ns.getKafkaGroup(group)); 109 this.consumer = new KafkaConsumer<>(consumerProps); 110 } 111 112 @SuppressWarnings("squid:S2095") 113 public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(Codec<M> codec, KafkaNamespace ns, 114 Collection<LogPartition> partitions, String group, Properties consumerProps) { 115 KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, ns, group, consumerProps); 116 ret.id = buildId(ret.group, partitions); 117 ret.partitions = partitions; 118 ret.topicPartitions = partitions.stream() 119 .map(partition -> new TopicPartition(ns.getTopicName(partition.name()), 120 partition.partition())) 121 .collect(Collectors.toList()); 122 ret.consumer.assign(ret.topicPartitions); 123 log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, ns)); 124 return ret; 125 } 126 127 @SuppressWarnings("squid:S2095") 128 public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(Codec<M> codec, KafkaNamespace ns, 129 Collection<String> names, String group, Properties consumerProps, RebalanceListener listener) { 130 KafkaLogTailer<M> ret = new KafkaLogTailer<>(codec, ns, group, consumerProps); 131 ret.id = buildSubscribeId(ret.group, names); 132 ret.names = names; 133 Collection<String> topics = names.stream().map(ns::getTopicName).collect(Collectors.toList()); 134 ret.listener = listener; 135 ret.consumer.subscribe(topics, ret); 136 ret.partitions = Collections.emptyList(); 137 ret.topicPartitions = Collections.emptyList(); 138 log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, ns)); 139 return ret; 140 } 141 142 protected static String buildId(String group, Collection<LogPartition> partitions) { 143 return group + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|")); 144 } 145 146 protected static String buildSubscribeId(String group, Collection<String> names) { 147 return group + ":" + names.stream().collect(Collectors.joining("|")); 148 } 149 150 @Override 151 public LogRecord<M> read(Duration timeout) throws InterruptedException { 152 if (closed) { 153 throw new IllegalStateException("The tailer has been closed."); 154 } 155 if (records.isEmpty()) { 156 int items = poll(timeout); 157 if (isRebalanced) { 158 isRebalanced = false; 159 log.debug("Rebalance happens during poll, raising exception"); 160 throw new RebalanceException("Partitions has been rebalanced"); 161 } 162 if (items == 0) { 163 if (log.isTraceEnabled()) { 164 log.trace("No data " + id + " after " + timeout.toMillis() + " ms"); 165 } 166 return null; 167 } 168 } 169 ConsumerRecord<String, Bytes> record = records.poll(); 170 lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); 171 M value = decodeCodec.decode(record.value().get()); 172 LogPartition partition = LogPartition.of(ns.getLogName(record.topic()), record.partition()); 173 LogOffset offset = new LogOffsetImpl(partition, record.offset()); 174 consumerMoved = false; 175 if (log.isDebugEnabled()) { 176 log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value)); 177 } 178 return new LogRecord<>(value, offset); 179 } 180 181 protected int poll(Duration timeout) throws InterruptedException { 182 records.clear(); 183 try { 184 for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout.toMillis())) { 185 if (log.isDebugEnabled() && records.isEmpty()) { 186 log.debug("Poll first record: " + ns.getLogName(record.topic()) + ":" + record.partition() + ":+" 187 + record.offset()); 188 } 189 records.add(record); 190 } 191 } catch (org.apache.kafka.common.errors.InterruptException e) { 192 // the thread is already interrupted 193 throw new InterruptedException(e.getMessage()); 194 } catch (WakeupException e) { 195 log.debug("Receiving wakeup from another thread to close the tailer"); 196 close(); 197 throw new IllegalStateException("poll interrupted because tailer has been closed"); 198 } 199 if (log.isDebugEnabled()) { 200 String msg = "Polling " + id + " returns " + records.size() + " records"; 201 if (records.isEmpty()) { 202 log.trace(msg); 203 } else { 204 log.debug(msg); 205 } 206 } 207 return records.size(); 208 } 209 210 @Override 211 public void toEnd() { 212 log.debug("toEnd: " + id); 213 lastOffsets.clear(); 214 records.clear(); 215 consumer.seekToEnd(Collections.emptyList()); 216 consumerMoved = true; 217 } 218 219 @Override 220 public void toStart() { 221 log.debug("toStart: " + id); 222 lastOffsets.clear(); 223 records.clear(); 224 consumer.seekToBeginning(Collections.emptyList()); 225 consumerMoved = true; 226 } 227 228 @Override 229 public void toLastCommitted() { 230 if (log.isDebugEnabled()) { 231 log.debug("toLastCommitted tailer: " + id); 232 } 233 String msg = consumer.assignment() 234 .stream() 235 .map(tp -> String.format("%s-%02d:+%d", ns.getLogName(tp.topic()), tp.partition(), 236 toLastCommitted(tp))) 237 .collect(Collectors.joining("|")); 238 if (msg.length() > 0 && log.isInfoEnabled()) { 239 log.info("toLastCommitted offsets: " + group + ":" + msg); 240 } 241 lastOffsets.clear(); 242 records.clear(); 243 consumerMoved = false; 244 } 245 246 protected long toLastCommitted(TopicPartition topicPartition) { 247 Long offset = lastCommittedOffsets.get(topicPartition); 248 if (offset == null) { 249 OffsetAndMetadata offsetMeta = consumer.committed(topicPartition); 250 if (offsetMeta != null) { 251 offset = offsetMeta.offset(); 252 } 253 } 254 if (offset != null) { 255 consumer.seek(topicPartition, offset); 256 } else { 257 consumer.seekToBeginning(Collections.singletonList(topicPartition)); 258 offset = consumer.position(topicPartition); 259 } 260 lastCommittedOffsets.put(topicPartition, offset); 261 if (log.isDebugEnabled()) { 262 log.debug(String.format(" toLastCommitted: %s-%02d:+%d", ns.getLogName(topicPartition.topic()), 263 topicPartition.partition(), offset)); 264 } 265 return offset; 266 } 267 268 @Override 269 public void seek(LogOffset offset) { 270 log.debug("Seek to: " + offset.offset() + " from tailer: " + id); 271 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(offset.partition().name()), 272 offset.partition().partition()); 273 consumer.seek(topicPartition, offset.offset()); 274 lastOffsets.remove(topicPartition); 275 int partition = topicPartition.partition(); 276 records.removeIf(rec -> rec.partition() == partition); 277 consumerMoved = true; 278 } 279 280 @Override 281 public void reset() { 282 // we just commit the first offset 283 log.info("Reset committed offsets for all assigned partitions: " + topicPartitions + " tailer: " + id); 284 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions); 285 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 286 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 287 consumer.commitSync(offsetToCommit); 288 lastCommittedOffsets.clear(); 289 toLastCommitted(); 290 } 291 292 @Override 293 public void reset(LogPartition partition) { 294 log.info("Reset committed offset for partition: " + partition + " tailer: " + id); 295 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition()); 296 Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(topicPartition)); 297 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 298 beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset))); 299 consumer.commitSync(offsetToCommit); 300 lastCommittedOffsets.remove(topicPartition); 301 seek(new LogOffsetImpl(partition, beginningOffsets.get(topicPartition))); 302 } 303 304 @Override 305 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 306 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition()); 307 Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes( 308 Collections.singletonMap(topicPartition, timestamp)); 309 if (offsetsForTimes.size() == 1) { 310 OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition); 311 if (offsetAndTimestamp != null) { 312 return new LogOffsetImpl(partition, offsetAndTimestamp.offset()); 313 } 314 } 315 return null; 316 } 317 318 @Override 319 public void commit() { 320 if (consumerMoved) { 321 forceCommit(); 322 return; 323 } 324 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 325 lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1))); 326 lastOffsets.clear(); 327 if (offsetToCommit.isEmpty()) { 328 return; 329 } 330 consumer.commitSync(offsetToCommit); 331 offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 332 if (log.isDebugEnabled()) { 333 String msg = offsetToCommit.entrySet() 334 .stream() 335 .map(entry -> String.format("%s-%02d:+%d", ns.getLogName(entry.getKey().topic()), 336 entry.getKey().partition(), entry.getValue().offset())) 337 .collect(Collectors.joining("|")); 338 log.debug("Committed offsets " + group + ":" + msg); 339 } 340 } 341 342 /** 343 * Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsets 344 */ 345 protected void forceCommit() { 346 log.info("Force commit after a move"); 347 348 Map<TopicPartition, OffsetAndMetadata> offsets = topicPartitions.stream().collect( 349 toMap(tp -> tp, tp -> new OffsetAndMetadata(consumer.position(tp)))); 350 consumer.commitSync(offsets); 351 offsets.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 352 consumerMoved = false; 353 lastOffsets.clear(); 354 } 355 356 @Override 357 public LogOffset commit(LogPartition partition) { 358 TopicPartition topicPartition = new TopicPartition(ns.getTopicName(partition.name()), partition.partition()); 359 Long offset = lastOffsets.get(topicPartition); 360 if (offset == null) { 361 if (log.isDebugEnabled()) { 362 log.debug("unchanged partition, nothing to commit: " + partition); 363 } 364 return null; 365 } 366 offset += 1; 367 consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset))); 368 LogOffset ret = new LogOffsetImpl(partition, offset); 369 if (log.isInfoEnabled()) { 370 log.info("Committed: " + offset + "/" + group); 371 } 372 return ret; 373 } 374 375 @Override 376 public Collection<LogPartition> assignments() { 377 return partitions; 378 } 379 380 @Override 381 public String group() { 382 return group; 383 } 384 385 @Override 386 public boolean closed() { 387 return closed; 388 } 389 390 @Override 391 public Codec<M> getCodec() { 392 return codec; 393 } 394 395 @SuppressWarnings("squid:S1181") 396 @Override 397 public void close() { 398 if (consumer != null) { 399 log.info("Closing tailer: " + id); 400 try { 401 // calling wakeup enable to terminate consumer blocking on poll call 402 consumer.close(); 403 } catch (ConcurrentModificationException e) { 404 // closing from another thread raise this exception, try to wakeup the owner 405 log.info("Closing tailer from another thread, send wakeup"); 406 consumer.wakeup(); 407 return; 408 } catch (InterruptException | IllegalStateException e) { 409 // this happens if the consumer has already been closed or if it is closed from another 410 // thread. 411 log.warn("Discard error while closing consumer: ", e); 412 } catch (Throwable t) { 413 log.error("interrupted", t); 414 } 415 consumer = null; 416 } 417 closed = true; 418 } 419 420 @Override 421 public String toString() { 422 return "KafkaLogTailer{" + "ns='" + ns + '\'' + ", id=" + id + ", closed=" + closed + ", codec=" + codec + '}'; 423 } 424 425 @Override 426 public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 427 Collection<LogPartition> revoked = partitions.stream() 428 .map(tp -> LogPartition.of(ns.getLogName(tp.topic()), 429 tp.partition())) 430 .collect(Collectors.toList()); 431 log.info(String.format("Rebalance revoked: %s", revoked)); 432 id += "-revoked"; 433 if (listener != null) { 434 listener.onPartitionsRevoked(revoked); 435 } 436 } 437 438 @Override 439 public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) { 440 partitions = newPartitions.stream() 441 .map(tp -> LogPartition.of(ns.getLogName(tp.topic()), tp.partition())) 442 .collect(Collectors.toList()); 443 topicPartitions = newPartitions; 444 id = buildId(group, partitions); 445 lastCommittedOffsets.clear(); 446 lastOffsets.clear(); 447 records.clear(); 448 isRebalanced = true; 449 log.info(String.format("Rebalance assigned: %s", partitions)); 450 if (listener != null) { 451 listener.onPartitionsAssigned(partitions); 452 } 453 } 454 455}