001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log; 020 021import java.io.Externalizable; 022import java.time.Duration; 023import java.util.Collection; 024 025/** 026 * Sequential reader for a partition or multiple partitions. A tailer is not thread safe and should not be shared by 027 * multiple threads. 028 */ 029public interface LogTailer<M extends Externalizable> extends AutoCloseable { 030 031 /** 032 * Returns the consumer group. 033 */ 034 String group(); 035 036 /** 037 * Returns the list of Log name, partitions tuples currently assigned to this tailer. Assignments can change only if 038 * the tailer has been created using {@link LogManager#subscribe}. 039 */ 040 Collection<LogPartition> assignments(); 041 042 /** 043 * Read a message from assigned partitions within the timeout. 044 * 045 * @return null if there is no message in the queue after the timeout. 046 * @throws RebalanceException if a partition rebalancing happen during the read, this is possible only when using 047 * {@link LogManager#subscribe}. 048 */ 049 LogRecord<M> read(Duration timeout) throws InterruptedException; 050 051 /** 052 * Commit current positions for all partitions (last message offset returned by read). 053 */ 054 void commit(); 055 056 /** 057 * Commit current position for the partition. 058 * 059 * @return the committed offset, can return null if there was no previous read done on this partition. 060 */ 061 LogOffset commit(LogPartition partition); 062 063 /** 064 * Set the current positions to the end of all partitions. 065 */ 066 void toEnd(); 067 068 /** 069 * Set the current positions to the beginning of all partitions. 070 */ 071 void toStart(); 072 073 /** 074 * Set the current positions to previously committed positions. 075 */ 076 void toLastCommitted(); 077 078 /** 079 * Set the current position for a single partition. Do not change other partitions positions. 080 * 081 * @since 9.3 082 */ 083 void seek(LogOffset offset); 084 085 /** 086 * Reset all committed positions for this group, next read will be done from beginning. 087 * 088 * @since 9.3 089 */ 090 void reset(); 091 092 /** 093 * Reset the committed position for this group on this partition, next read for this partition will be done from the 094 * beginning. 095 * 096 * @since 9.3 097 */ 098 void reset(LogPartition partition); 099 100 @Override 101 void close(); 102 103 /** 104 * Returns {@code true} if the tailer has been closed. 105 */ 106 boolean closed(); 107}