001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import java.io.Externalizable;
022import java.time.Duration;
023import java.util.Collection;
024
025import org.nuxeo.lib.stream.codec.Codec;
026
027/**
028 * Sequential reader for a partition or multiple partitions. A tailer is not thread safe and should not be shared by
029 * multiple threads.
030 */
031public interface LogTailer<M extends Externalizable> extends AutoCloseable {
032
033    /**
034     * Returns the consumer group.
035     */
036    Name group();
037
038    /**
039     * Returns the list of Log name, partitions tuples currently assigned to this tailer. Assignments can change only if
040     * the tailer has been created using {@link LogManager#subscribe}.
041     */
042    Collection<LogPartition> assignments();
043
044    /**
045     * Read a message from assigned partitions within the timeout.
046     *
047     * @return null if there is no message in the queue after the timeout.
048     * @throws RebalanceException if a partition rebalancing happen during the read, this is possible only when using
049     *             {@link LogManager#subscribe}.
050     */
051    LogRecord<M> read(Duration timeout) throws InterruptedException;
052
053    /**
054     * Commit current positions for all partitions (last message offset returned by read).
055     */
056    void commit();
057
058    /**
059     * Commit current position for the partition.
060     *
061     * @return the committed offset, can return null if there was no previous read done on this partition.
062     */
063    LogOffset commit(LogPartition partition);
064
065    /**
066     * Set the current positions to the end of all partitions.
067     */
068    void toEnd();
069
070    /**
071     * Set the current positions to the beginning of all partitions.
072     */
073    void toStart();
074
075    /**
076     * Set the current positions to previously committed positions.
077     */
078    void toLastCommitted();
079
080    /**
081     * Set the current position for a single partition. Do not change other partitions positions.
082     *
083     * @since 9.3
084     */
085    void seek(LogOffset offset);
086
087    /**
088     * Look up the offset for the given partition by timestamp. The position is the earliest offset whose timestamp is
089     * greater than or equal to the given timestamp.
090     * <p>
091     * The timestamp used depends on the implementation, for Kafka this is the LogAppendTime. Returns null if no record
092     * offset is found with an appropriate timestamp.
093     *
094     * @since 10.1
095     */
096    LogOffset offsetForTimestamp(LogPartition partition, long timestamp);
097
098    /**
099     * Reset all committed positions for this group, next read will be done from beginning.
100     *
101     * @since 9.3
102     */
103    void reset();
104
105    /**
106     * Reset the committed position for this group on this partition, next read for this partition will be done from the
107     * beginning.
108     *
109     * @since 9.3
110     */
111    void reset(LogPartition partition);
112
113    @Override
114    void close();
115
116    /**
117     * Returns {@code true} if the tailer has been closed.
118     */
119    boolean closed();
120
121    /**
122     * Returns the codec used to read the records. A null codec is the default legacy encoding.
123     *
124     * @since 10.2
125     */
126    Codec<M> getCodec();
127}