001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import java.io.Externalizable;
022import java.time.Duration;
023import java.util.Collection;
024
025/**
026 * Sequential reader for a partition or multiple partitions. A tailer is not thread safe and should not be shared by
027 * multiple threads.
028 */
029public interface LogTailer<M extends Externalizable> extends AutoCloseable {
030
031    /**
032     * Returns the consumer group.
033     */
034    String group();
035
036    /**
037     * Returns the list of Log name, partitions tuples currently assigned to this tailer. Assignments can change only if
038     * the tailer has been created using {@link LogManager#subscribe}.
039     */
040    Collection<LogPartition> assignments();
041
042    /**
043     * Read a message from assigned partitions within the timeout.
044     *
045     * @return null if there is no message in the queue after the timeout.
046     * @throws RebalanceException if a partition rebalancing happen during the read, this is possible only when using
047     *             {@link LogManager#subscribe}.
048     */
049    LogRecord<M> read(Duration timeout) throws InterruptedException;
050
051    /**
052     * Commit current positions for all partitions (last message offset returned by read).
053     */
054    void commit();
055
056    /**
057     * Commit current position for the partition.
058     *
059     * @return the committed offset, can return null if there was no previous read done on this partition.
060     */
061    LogOffset commit(LogPartition partition);
062
063    /**
064     * Set the current positions to the end of all partitions.
065     */
066    void toEnd();
067
068    /**
069     * Set the current positions to the beginning of all partitions.
070     */
071    void toStart();
072
073    /**
074     * Set the current positions to previously committed positions.
075     */
076    void toLastCommitted();
077
078    /**
079     * Set the current position for a single partition. Do not change other partitions positions.
080     *
081     * @since 9.3
082     */
083    void seek(LogOffset offset);
084
085    /**
086     * Look up the offset for the given partition by timestamp.
087     * The position is the earliest offset whose timestamp is greater than or equal to the given timestamp.<p/>
088     * The timestamp used depends on the implementation, for Kafka this is the LogAppendTime.
089     * Returns null if no record offset is found with an appropriate timestamp.
090     *
091     * @since 10.1
092     */
093    LogOffset offsetForTimestamp(LogPartition partition, long timestamp);
094
095    /**
096     * Reset all committed positions for this group, next read will be done from beginning.
097     *
098     * @since 9.3
099     */
100    void reset();
101
102    /**
103     * Reset the committed position for this group on this partition, next read for this partition will be done from the
104     * beginning.
105     *
106     * @since 9.3
107     */
108    void reset(LogPartition partition);
109
110    @Override
111    void close();
112
113    /**
114     * Returns {@code true} if the tailer has been closed.
115     */
116    boolean closed();
117}