001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.List;
027import java.util.function.Function;
028import java.util.stream.Collectors;
029import java.util.stream.IntStream;
030
031import org.nuxeo.lib.stream.codec.Codec;
032
033/**
034 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and
035 * tailers.
036 *
037 * @since 9.3
038 */
039@SuppressWarnings("unchecked")
040public interface LogManager extends AutoCloseable {
041
042    /**
043     * Returns {@code true} if a Log with this {@code name} exists.
044     */
045    boolean exists(String name);
046
047    /**
048     * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been
049     * created.
050     */
051    boolean createIfNotExists(String name, int size);
052
053    /**
054     * Tries to delete a Log. Returns true if successfully deleted, might not be possible depending on the
055     * implementation.
056     */
057    boolean delete(String name);
058
059    /**
060     * Returns the number of partition of a Log.
061     *
062     * @since 10.2
063     */
064    int size(String name);
065
066    /**
067     * Gets an appender for the Log named {@code name}, uses {@code codec} to encode records. An appender is thread
068     * safe.
069     *
070     * @since 10.2
071     */
072    <M extends Externalizable> LogAppender<M> getAppender(String name, Codec<M> codec);
073
074    /**
075     * Gets an appender for the Log named {@code name}, uses an already defined codec or the legacy encoding to encode
076     * records. An appender is thread safe.
077     */
078    default <M extends Externalizable> LogAppender<M> getAppender(String name) {
079        return getAppender(name, NO_CODEC);
080    }
081
082    /**
083     * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Uses {@code codec} to
084     * decode records. Note that {@code partitions} can be from different Logs. A tailer is NOT thread safe.
085     *
086     * @since 10.2
087     */
088    <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions,
089            Codec<M> codec);
090
091    /**
092     * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that
093     * {@code partitions} can be from different Logs. Reads records using the legacy decoder. A tailer is NOT thread
094     * safe.
095     */
096    default <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) {
097        return createTailer(group, partitions, NO_CODEC);
098    }
099
100    /**
101     * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Reads records using the
102     * legacy decoder. A tailer is NOT thread safe.
103     */
104    default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) {
105        return createTailer(group, partition, NO_CODEC);
106    }
107
108    /**
109     * Creates a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. Reads records using
110     * the legacy decoder. A tailer is NOT thread safe.
111     */
112    default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) {
113        return createTailer(group, name, NO_CODEC);
114    }
115
116    /**
117     * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Use an explicit codec to
118     * decode records. A tailer is NOT thread safe.
119     *
120     * @since 10.2
121     */
122    default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition, Codec<M> codec) {
123        return createTailer(group, Collections.singletonList(partition), codec);
124    }
125
126    /**
127     * Creates a tailer for a consumer {@code group} and assigns all {@code partitions} of the Log. Uses {@code codec}
128     * to decode records. A tailer is NOT thread safe.
129     *
130     * @since 10.2
131     */
132    default <M extends Externalizable> LogTailer<M> createTailer(String group, String name, Codec<M> codec) {
133        int partitions = size(name);
134        if (partitions <= 0) {
135            throw new IllegalArgumentException("Log name: " + name + " not found");
136        }
137        return createTailer(group,
138                IntStream.range(0, partitions)
139                         .boxed()
140                         .map(partition -> new LogPartition(name, partition))
141                         .collect(Collectors.toList()),
142                codec);
143    }
144
145    /**
146     * Returns {@code true} if the Log {@link #subscribe} method is supported.
147     */
148    boolean supportSubscribe();
149
150    /**
151     * Creates a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done
152     * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called
153     * a rebalancing. A listener can be used to be notified on assignment changes. Uses {@code codec} to decode records.
154     * <p/>
155     * A tailer is NOT thread safe.
156     * <p/>
157     * You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}.
158     */
159    <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
160            RebalanceListener listener, Codec<M> codec);
161
162    default <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
163            RebalanceListener listener) {
164        return subscribe(group, names, listener, NO_CODEC);
165    }
166
167    /**
168     * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered,
169     * for instance index 0 is lag for partition 0.
170     */
171    List<LogLag> getLagPerPartition(String name, String group);
172
173    /**
174     * Returns the lag between consumer {@code group} and producers for a Log.
175     */
176    default LogLag getLag(String name, String group) {
177        return LogLag.of(getLagPerPartition(name, group));
178    }
179
180    /**
181     * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This
182     * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}.
183     * <br/>
184     * Two functions need to be provided to extract the timestamp and a key from a record.
185     *
186     * @since 10.2
187     */
188    <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, Codec<M> codec,
189            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor);
190
191    /**
192     * Returns the latency between consumer {@code group} and producers for a Log.
193     *
194     * @since 10.2
195     */
196    default <M extends Externalizable> Latency getLatency(String name, String group, Codec<M> codec,
197            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
198        return Latency.of(getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor));
199    }
200
201    /**
202     * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This
203     * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}.
204     * <br/>
205     * Two functions need to be provided to extract the timestamp and a key from a record.
206     *
207     * @since 10.1
208     * @deprecated 10.2 use {@link #getLatencyPerPartition(String, String, Codec, Function, Function)} instead.
209     */
210    @Deprecated
211    default <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group,
212            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
213        return getLatencyPerPartition(name, group, NO_CODEC, timestampExtractor, keyExtractor);
214    }
215
216    /**
217     * Returns the latency between consumer {@code group} and producers for a Log.
218     *
219     * @since 10.1
220     * @deprecated 10.2 use {@link #getLatency(String, String, Codec, Function, Function)} instead.
221     */
222    @Deprecated
223    default <M extends Externalizable> Latency getLatency(String name, String group,
224            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
225        return getLatency(name, group, NO_CODEC, timestampExtractor, keyExtractor);
226    }
227
228    /**
229     * Returns all the Log names.
230     */
231    List<String> listAll();
232
233    /**
234     * List the consumer groups for a Log.<br/>
235     * Note that for Kafka it returns only consumers that use the subscribe API.
236     */
237    List<String> listConsumerGroups(String name);
238
239    @Override
240    void close();
241}