001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import java.io.Externalizable;
022import java.time.Duration;
023import java.util.Collection;
024
025/**
026 * Sequential reader for a partition or multiple partitions. A tailer is not thread safe and should not be shared by
027 * multiple threads.
028 */
029public interface LogTailer<M extends Externalizable> extends AutoCloseable {
030
031    /**
032     * Returns the consumer group.
033     */
034    String group();
035
036    /**
037     * Returns the list of Log name, partitions tuples currently assigned to this tailer. Assignments can change only if
038     * the tailer has been created using {@link LogManager#subscribe}.
039     */
040    Collection<LogPartition> assignments();
041
042    /**
043     * Read a message from assigned partitions within the timeout.
044     *
045     * @return null if there is no message in the queue after the timeout.
046     * @throws RebalanceException if a partition rebalancing happen during the read, this is possible only when using
047     *             {@link LogManager#subscribe}.
048     */
049    LogRecord<M> read(Duration timeout) throws InterruptedException;
050
051    /**
052     * Commit current positions for all partitions (last message offset returned by read).
053     */
054    void commit();
055
056    /**
057     * Commit current position for the partition.
058     *
059     * @return the committed offset, can return null if there was no previous read done on this partition.
060     */
061    LogOffset commit(LogPartition partition);
062
063    /**
064     * Set the current positions to the end of all partitions.
065     */
066    void toEnd();
067
068    /**
069     * Set the current positions to the beginning of all partitions.
070     */
071    void toStart();
072
073    /**
074     * Set the current positions to previously committed positions.
075     */
076    void toLastCommitted();
077
078    /**
079     * Set the current position for a single partition. Do not change other partitions positions.
080     *
081     * @since 9.3
082     */
083    void seek(LogOffset offset);
084
085    /**
086     * Reset all committed positions for this group, next read will be done from beginning.
087     *
088     * @since 9.3
089     */
090    void reset();
091
092    /**
093     * Reset the committed position for this group on this partition, next read for this partition will be done from the
094     * beginning.
095     *
096     * @since 9.3
097     */
098    void reset(LogPartition partition);
099
100    @Override
101    void close();
102
103    /**
104     * Returns {@code true} if the tailer has been closed.
105     */
106    boolean closed();
107}