001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log; 020 021import java.io.Externalizable; 022import java.time.Duration; 023import java.util.Collection; 024 025import org.nuxeo.lib.stream.codec.Codec; 026 027/** 028 * Sequential reader for a partition or multiple partitions. A tailer is not thread safe and should not be shared by 029 * multiple threads. 030 */ 031public interface LogTailer<M extends Externalizable> extends AutoCloseable { 032 033 /** 034 * Returns the consumer group. 035 */ 036 Name group(); 037 038 /** 039 * Returns the list of Log name, partitions tuples currently assigned to this tailer. Assignments can change only if 040 * the tailer has been created using {@link LogManager#subscribe}. 041 */ 042 Collection<LogPartition> assignments(); 043 044 /** 045 * Read a message from assigned partitions within the timeout. 046 * 047 * @return null if there is no message in the queue after the timeout. 048 * @throws RebalanceException if a partition rebalancing happen during the read, this is possible only when using 049 * {@link LogManager#subscribe}. 050 */ 051 LogRecord<M> read(Duration timeout) throws InterruptedException; 052 053 /** 054 * Commit current positions for all partitions (last message offset returned by read). 055 */ 056 void commit(); 057 058 /** 059 * Commit current position for the partition. 060 * 061 * @return the committed offset, can return null if there was no previous read done on this partition. 062 */ 063 LogOffset commit(LogPartition partition); 064 065 /** 066 * Set the current positions to the end of all partitions. 067 */ 068 void toEnd(); 069 070 /** 071 * Set the current positions to the beginning of all partitions. 072 */ 073 void toStart(); 074 075 /** 076 * Set the current positions to previously committed positions. 077 */ 078 void toLastCommitted(); 079 080 /** 081 * Set the current position for a single partition. Do not change other partitions positions. 082 * 083 * @since 9.3 084 */ 085 void seek(LogOffset offset); 086 087 /** 088 * Look up the offset for the given partition by timestamp. The position is the earliest offset whose timestamp is 089 * greater than or equal to the given timestamp. 090 * <p> 091 * The timestamp used depends on the implementation, for Kafka this is the LogAppendTime. Returns null if no record 092 * offset is found with an appropriate timestamp. 093 * 094 * @since 10.1 095 */ 096 LogOffset offsetForTimestamp(LogPartition partition, long timestamp); 097 098 /** 099 * Reset all committed positions for this group, next read will be done from beginning. 100 * 101 * @since 9.3 102 */ 103 void reset(); 104 105 /** 106 * Reset the committed position for this group on this partition, next read for this partition will be done from the 107 * beginning. 108 * 109 * @since 9.3 110 */ 111 void reset(LogPartition partition); 112 113 @Override 114 void close(); 115 116 /** 117 * Returns {@code true} if the tailer has been closed. 118 */ 119 boolean closed(); 120 121 /** 122 * Returns the codec used to read the records. A null codec is the default legacy encoding. 123 * 124 * @since 10.2 125 */ 126 Codec<M> getCodec(); 127}