001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.List;
027import java.util.function.Function;
028import java.util.stream.Collectors;
029import java.util.stream.IntStream;
030
031import org.nuxeo.lib.stream.codec.Codec;
032
033/**
034 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and
035 * tailers.
036 *
037 * @since 9.3
038 */
039@SuppressWarnings("unchecked")
040public interface LogManager extends AutoCloseable {
041
042    /**
043     * Returns {@code true} if a Log with this {@code name} exists.
044     */
045    boolean exists(String name);
046
047    /**
048     * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been
049     * created.
050     */
051    boolean createIfNotExists(String name, int size);
052
053    /**
054     * Tries to delete a Log. Returns true if successfully deleted, might not be possible depending on the
055     * implementation.
056     */
057    boolean delete(String name);
058
059    /**
060     * Returns the number of partition of a Log.
061     *
062     * @since 10.2
063     */
064    int size(String name);
065
066    /**
067     * Gets an appender for the Log named {@code name}, uses {@code codec} to encode records. An appender is thread
068     * safe.
069     *
070     * @since 10.2
071     */
072    <M extends Externalizable> LogAppender<M> getAppender(String name, Codec<M> codec);
073
074    /**
075     * Gets an appender for the Log named {@code name}, uses an already defined codec or the legacy encoding to encode
076     * records. An appender is thread safe.
077     */
078    default <M extends Externalizable> LogAppender<M> getAppender(String name) {
079        return getAppender(name, NO_CODEC);
080    }
081
082    /**
083     * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Uses {@code codec} to
084     * decode records. Note that {@code partitions} can be from different Logs. A tailer is NOT thread safe.
085     *
086     * @since 10.2
087     */
088    <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions,
089            Codec<M> codec);
090
091    /**
092     * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that
093     * {@code partitions} can be from different Logs. Reads records using the legacy decoder. A tailer is NOT thread
094     * safe.
095     */
096    default <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) {
097        return createTailer(group, partitions, NO_CODEC);
098    }
099
100    /**
101     * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Reads records using the
102     * legacy decoder. A tailer is NOT thread safe.
103     */
104    default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) {
105        return createTailer(group, partition, NO_CODEC);
106    }
107
108    /**
109     * Creates a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. Reads records using
110     * the legacy decoder. A tailer is NOT thread safe.
111     */
112    default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) {
113        return createTailer(group, name, NO_CODEC);
114    }
115
116    /**
117     * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Use an explicit codec to
118     * decode records. A tailer is NOT thread safe.
119     *
120     * @since 10.2
121     */
122    default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition, Codec<M> codec) {
123        return createTailer(group, Collections.singletonList(partition), codec);
124    }
125
126    /**
127     * Creates a tailer for a consumer {@code group} and assigns all {@code partitions} of the Log. Uses {@code codec}
128     * to decode records. A tailer is NOT thread safe.
129     *
130     * @since 10.2
131     */
132    default <M extends Externalizable> LogTailer<M> createTailer(String group, String name, Codec<M> codec) {
133        return createTailer(group,
134                IntStream.range(0, size(name)).boxed().map(partition -> new LogPartition(name, partition)).collect(
135                        Collectors.toList()),
136                codec);
137    }
138
139    /**
140     * Returns {@code true} if the Log {@link #subscribe} method is supported.
141     */
142    boolean supportSubscribe();
143
144    /**
145     * Creates a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done
146     * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called
147     * a rebalancing. A listener can be used to be notified on assignment changes. Uses {@code codec} to decode records.
148     * <p/>
149     * A tailer is NOT thread safe.
150     * <p/>
151     * You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}.
152     */
153    <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
154            RebalanceListener listener, Codec<M> codec);
155
156    default <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
157            RebalanceListener listener) {
158        return subscribe(group, names, listener, NO_CODEC);
159    }
160
161    /**
162     * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered,
163     * for instance index 0 is lag for partition 0.
164     */
165    List<LogLag> getLagPerPartition(String name, String group);
166
167    /**
168     * Returns the lag between consumer {@code group} and producers for a Log.
169     */
170    default LogLag getLag(String name, String group) {
171        return LogLag.of(getLagPerPartition(name, group));
172    }
173
174    /**
175     * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This
176     * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}.
177     * <br/>
178     * Two functions need to be provided to extract the timestamp and a key from a record.
179     *
180     * @since 10.2
181     */
182    <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, Codec<M> codec,
183            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor);
184
185    /**
186     * Returns the latency between consumer {@code group} and producers for a Log.
187     *
188     * @since 10.2
189     */
190    default <M extends Externalizable> Latency getLatency(String name, String group, Codec<M> codec,
191            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
192        return Latency.of(getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor));
193    }
194
195    /**
196     * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This
197     * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}.
198     * <br/>
199     * Two functions need to be provided to extract the timestamp and a key from a record.
200     *
201     * @since 10.1
202     * @deprecated 10.2 use {@link #getLatencyPerPartition(String, String, Codec, Function, Function)} instead.
203     */
204    @Deprecated
205    default <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group,
206            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
207        return getLatencyPerPartition(name, group, NO_CODEC, timestampExtractor, keyExtractor);
208    }
209
210    /**
211     * Returns the latency between consumer {@code group} and producers for a Log.
212     *
213     * @since 10.1
214     * @deprecated 10.2 use {@link #getLatency(String, String, Codec, Function, Function)} instead.
215     */
216    @Deprecated
217    default <M extends Externalizable> Latency getLatency(String name, String group,
218            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
219        return getLatency(name, group, NO_CODEC, timestampExtractor, keyExtractor);
220    }
221
222    /**
223     * Returns all the Log names.
224     */
225    List<String> listAll();
226
227    /**
228     * List the consumer groups for a Log.<br/>
229     * Note that for Kafka it returns only consumers that use the subscribe API.
230     */
231    List<String> listConsumerGroups(String name);
232
233    @Override
234    void close();
235}