001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.apache.kafka.clients.consumer.ConsumerConfig; 024import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 025import org.apache.kafka.clients.consumer.ConsumerRecord; 026import org.apache.kafka.clients.consumer.KafkaConsumer; 027import org.apache.kafka.clients.consumer.OffsetAndMetadata; 028import org.apache.kafka.common.TopicPartition; 029import org.apache.kafka.common.errors.InterruptException; 030import org.apache.kafka.common.errors.WakeupException; 031import org.apache.kafka.common.utils.Bytes; 032import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset; 033import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 034import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceException; 035import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener; 036import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord; 037import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer; 038import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQOffsetImpl; 039 040import java.io.ByteArrayInputStream; 041import java.io.Externalizable; 042import java.io.IOException; 043import java.io.ObjectInput; 044import java.io.ObjectInputStream; 045import java.time.Duration; 046import java.util.Collection; 047import java.util.Collections; 048import java.util.ConcurrentModificationException; 049import java.util.HashMap; 050import java.util.LinkedList; 051import java.util.Map; 052import java.util.Objects; 053import java.util.Properties; 054import java.util.Queue; 055import java.util.stream.Collectors; 056 057 058/** 059 * @since 9.2 060 */ 061public class KafkaMQTailer<M extends Externalizable> implements MQTailer<M>, ConsumerRebalanceListener { 062 private static final Log log = LogFactory.getLog(KafkaMQTailer.class); 063 private final String group; 064 private final String prefix; 065 private KafkaConsumer<String, Bytes> consumer; 066 private String id; 067 private Collection<TopicPartition> topicPartitions; 068 private Collection<MQPartition> partitions; 069 private final Map<TopicPartition, Long> lastOffsets = new HashMap<>(); 070 private final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>(); 071 private final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<>(); 072 // keep track of all tailers on the same namespace index even from different mq 073 private boolean closed = false; 074 private Collection<String> names; 075 private MQRebalanceListener listener; 076 private boolean isRebalanced = false; 077 078 protected KafkaMQTailer(String prefix, String group, Properties consumerProps) { 079 Objects.requireNonNull(group); 080 this.prefix = prefix; 081 this.group = group; 082 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group); 083 this.consumer = new KafkaConsumer<>(consumerProps); 084 085 } 086 087 public static <M extends Externalizable> KafkaMQTailer<M> createAndAssign(String prefix, Collection<MQPartition> partitions, String group, Properties consumerProps) { 088 KafkaMQTailer<M> ret = new KafkaMQTailer<>(prefix, group, consumerProps); 089 ret.id = buildId(ret.group, partitions); 090 ret.partitions = partitions; 091 ret.topicPartitions = partitions.stream().map(partition -> new TopicPartition(prefix + partition.name(), 092 partition.partition())).collect(Collectors.toList()); 093 ret.consumer.assign(ret.topicPartitions); 094 log.debug(String.format("Created tailer with assignments: %s using prefix: %s", ret.id, prefix)); 095 return ret; 096 } 097 098 public static <M extends Externalizable> KafkaMQTailer<M> createAndSubscribe(String prefix, Collection<String> names, String group, Properties consumerProps, 099 MQRebalanceListener listener) { 100 KafkaMQTailer<M> ret = new KafkaMQTailer<>(prefix, group, consumerProps); 101 ret.id = buildSubscribeId(ret.group, names); 102 ret.names = names; 103 Collection<String> topics = names.stream().map(name -> prefix + name).collect(Collectors.toList()); 104 ret.listener = listener; 105 ret.consumer.subscribe(topics, ret); 106 ret.partitions = Collections.emptyList(); 107 log.debug(String.format("Created tailer with subscription: %s using prefix: %s", ret.id, prefix)); 108 return ret; 109 } 110 111 private static String buildId(String group, Collection<MQPartition> partitions) { 112 return group + ":" + partitions.stream().map(MQPartition::toString).collect(Collectors.joining("|")); 113 } 114 115 private static String buildSubscribeId(String group, Collection<String> names) { 116 return group + ":" + names.stream().collect(Collectors.joining("|")); 117 } 118 119 120 @Override 121 public MQRecord<M> read(Duration timeout) throws InterruptedException { 122 if (closed) { 123 throw new IllegalStateException("The tailer has been closed."); 124 } 125 if (records.isEmpty()) { 126 int items = poll(timeout); 127 if (isRebalanced) { 128 isRebalanced = false; 129 log.debug("Rebalance happens during poll, raising exception"); 130 throw new MQRebalanceException(); 131 } 132 if (items == 0) { 133 if (log.isTraceEnabled()) { 134 log.trace("No data " + id + " after " + timeout.toMillis() + " ms"); 135 } 136 return null; 137 } 138 } 139 ConsumerRecord<String, Bytes> record = records.poll(); 140 lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); 141 M value = messageOf(record.value()); 142 MQPartition partition = MQPartition.of(getNameForTopic(record.topic()), record.partition()); 143 MQOffset offset = new MQOffsetImpl(partition, record.offset()); 144 if (log.isDebugEnabled()) { 145 log.debug(String.format("Read from %s/%s, key: %s, value: %s", offset, group, record.key(), value)); 146 } 147 return new MQRecord<>(partition, value, offset); 148 } 149 150 private String getNameForTopic(String topic) { 151 return topic.replaceFirst(prefix, ""); 152 } 153 154 @SuppressWarnings("unchecked") 155 private M messageOf(Bytes value) { 156 ByteArrayInputStream bis = new ByteArrayInputStream(value.get()); 157 ObjectInput in = null; 158 try { 159 in = new ObjectInputStream(bis); 160 return (M) in.readObject(); 161 } catch (IOException | ClassNotFoundException e) { 162 throw new RuntimeException(e); 163 } finally { 164 try { 165 if (in != null) { 166 in.close(); 167 } 168 } catch (IOException ex) { 169 // ignore close exception 170 } 171 } 172 } 173 174 private int poll(Duration timeout) throws InterruptedException { 175 records.clear(); 176 try { 177 for (ConsumerRecord<String, Bytes> record : consumer.poll(timeout.toMillis())) { 178 if (log.isDebugEnabled() && records.isEmpty()) { 179 log.debug("Poll first record: " + getNameForTopic(record.topic()) + ":" + record.partition() + ":+" + record.offset()); 180 } 181 records.add(record); 182 } 183 } catch (org.apache.kafka.common.errors.InterruptException e) { 184 Thread.currentThread().interrupt(); 185 throw new InterruptedException(e.getMessage()); 186 } catch (WakeupException e) { 187 log.debug("Receiving wakeup from another thread to close the tailer"); 188 try { 189 close(); 190 } catch (Exception e1) { 191 log.warn("Error while closing the tailer " + this); 192 } 193 throw new IllegalStateException("poll interrupted because tailer has been closed"); 194 } 195 if (log.isDebugEnabled()) { 196 String msg = "Polling " + id + " returns " + records.size() + " records"; 197 if (records.size() > 0) { 198 log.debug(msg); 199 } else { 200 log.trace(msg); 201 } 202 } 203 return records.size(); 204 } 205 206 @Override 207 public void toEnd() { 208 log.debug("toEnd: " + id); 209 lastOffsets.clear(); 210 records.clear(); 211 consumer.seekToEnd(topicPartitions); 212 } 213 214 @Override 215 public void toStart() { 216 log.debug("toStart: " + id); 217 lastOffsets.clear(); 218 records.clear(); 219 consumer.seekToBeginning(topicPartitions); 220 } 221 222 @Override 223 public void toLastCommitted() { 224 log.debug("toLastCommitted tailer: " + id); 225 String msg = consumer.assignment().stream().map(tp -> String.format("%s-%02d:+%d", 226 getNameForTopic(tp.topic()), tp.partition(), toLastCommitted(tp))) 227 .collect(Collectors.joining("|")); 228 if (msg.length() > 0) { 229 log.info("toLastCommitted offsets: " + group + ":" + msg); 230 } 231 lastCommittedOffsets.clear(); 232 lastOffsets.clear(); 233 records.clear(); 234 } 235 236 private long toLastCommitted(TopicPartition topicPartition) { 237 Long offset = lastCommittedOffsets.get(topicPartition); 238 if (offset == null) { 239 OffsetAndMetadata offsetMeta = consumer.committed(topicPartition); 240 if (offsetMeta != null) { 241 offset = offsetMeta.offset(); 242 } 243 } 244 if (offset != null) { 245 consumer.seek(topicPartition, offset); 246 } else { 247 consumer.seekToBeginning(Collections.singletonList(topicPartition)); 248 offset = consumer.position(topicPartition); 249 } 250 log.debug(String.format(" toLastCommitted: %s-%02d:+%d", getNameForTopic(topicPartition.topic()), 251 topicPartition.partition(), 252 offset)); 253 return offset; 254 } 255 256 public void seek(MQPartition partition, MQOffset offset) { 257 log.debug("seek tailer: " + id + " +" + offset); 258 TopicPartition topicPartition = new TopicPartition(prefix + partition.name(), partition.partition()); 259 consumer.seek(topicPartition, offset.offset()); 260 //lastOffsets.remove(topicPartition); 261 // records.stream().filter(rec -> partition.partition() != rec.partition() || partition.equals(rec.)) 262 records.clear(); 263 } 264 265 @Override 266 public void commit() { 267 Map<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<>(); 268 lastOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset + 1))); 269 lastOffsets.clear(); 270 if (offsetToCommit.isEmpty()) { 271 return; 272 } 273 consumer.commitSync(offsetToCommit); 274 offsetToCommit.forEach((topicPartition, offset) -> lastCommittedOffsets.put(topicPartition, offset.offset())); 275 if (log.isDebugEnabled()) { 276 String msg = offsetToCommit.entrySet().stream().map(entry -> String.format("%s-%02d:+%d", 277 getNameForTopic(entry.getKey().topic()), entry.getKey().partition(), entry.getValue().offset())) 278 .collect(Collectors.joining("|")); 279 log.debug("Committed offsets " + group + ":" + msg); 280 } 281 } 282 283 @Override 284 public MQOffset commit(MQPartition partition) { 285 TopicPartition topicPartition = new TopicPartition(prefix + partition.name(), partition.partition()); 286 Long offset = lastOffsets.get(topicPartition); 287 if (offset == null) { 288 log.debug("unchanged partition, nothing to commit: " + partition); 289 return null; 290 } 291 offset += 1; 292 consumer.commitSync(Collections.singletonMap(topicPartition, 293 new OffsetAndMetadata(offset))); 294 MQOffset ret = new MQOffsetImpl(partition, offset); 295 if (log.isDebugEnabled()) { 296 log.info("Committed: " + offset + "/" + group); 297 } 298 return ret; 299 } 300 301 @Override 302 public Collection<MQPartition> assignments() { 303 return partitions; 304 } 305 306 @Override 307 public String group() { 308 return group; 309 } 310 311 @Override 312 public boolean closed() { 313 return closed; 314 } 315 316 @Override 317 public void close() throws Exception { 318 if (consumer != null) { 319 log.info("Closing tailer: " + id); 320 try { 321 // calling wakeup enable to terminate consumer blocking on poll call 322 consumer.close(); 323 } catch (ConcurrentModificationException e) { 324 // closing from another thread raise this exception, try to wakeup the owner 325 log.info("Closing tailer from another thread, send wakeup"); 326 consumer.wakeup(); 327 return; 328 } catch (InterruptException | IllegalStateException e) { 329 // this happens if the consumer has already been closed or if it is closed from another 330 // thread. 331 log.warn("Discard error while closing consumer: ", e); 332 } catch (Throwable t) { 333 log.error("interrupted", t); 334 } 335 consumer = null; 336 } 337 closed = true; 338 } 339 340 @Override 341 public String toString() { 342 return "KafkaMQTailer{" + 343 "prefix='" + prefix + '\'' + 344 ", id=" + id + 345 ", closed=" + closed + 346 '}'; 347 } 348 349 @Override 350 public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 351 Collection<MQPartition> revoked = partitions.stream() 352 .map(tp -> MQPartition.of(getNameForTopic(tp.topic()), tp.partition())) 353 .collect(Collectors.toList()); 354 log.info(String.format("Rebalance revoked: %s", revoked)); 355 id += "-revoked"; 356 if (listener != null) { 357 listener.onPartitionsRevoked(revoked); 358 } 359 } 360 361 @Override 362 public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) { 363 partitions = newPartitions.stream().map(tp -> MQPartition.of(getNameForTopic(tp.topic()), tp.partition())) 364 .collect(Collectors.toList()); 365 id = buildId(group, partitions); 366 lastCommittedOffsets.clear(); 367 lastOffsets.clear(); 368 records.clear(); 369 isRebalanced = true; 370 log.info(String.format("Rebalance assigned: %s", partitions)); 371 if (listener != null) { 372 listener.onPartitionsAssigned(partitions); 373 } 374 } 375 376 377}