001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.List;
027import java.util.function.Function;
028import java.util.stream.Collectors;
029import java.util.stream.IntStream;
030
031import org.nuxeo.lib.stream.codec.Codec;
032
033/**
034 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and
035 * tailers.
036 *
037 * @since 9.3
038 */
039@SuppressWarnings("unchecked")
040public interface LogManager extends AutoCloseable {
041
042    /**
043     * Returns {@code true} if a Log with this {@code name} exists.
044     *
045     * @since 11.1
046     */
047    boolean exists(Name name);
048
049    /**
050     * @deprecated since 11.1 use {@link #exists(Name)} instead
051     */
052    @Deprecated(since = "11.1")
053    default boolean exists(String name) {
054        return exists(Name.ofUrn(name));
055    }
056
057    /**
058     * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been
059     * created.
060     *
061     * @since 11.1
062     */
063    boolean createIfNotExists(Name name, int size);
064
065    /**
066     * @deprecated since 11.1 use {@link #createIfNotExists(Name, int)} instead
067     */
068    @Deprecated(since = "11.1")
069    default boolean createIfNotExists(String name, int size) {
070        return createIfNotExists(Name.ofUrn(name), size);
071    }
072
073    /**
074     * Tries to delete a Log. Returns true if successfully deleted, might not be possible depending on the
075     * implementation.
076     *
077     * @since 11.1
078     */
079    boolean delete(Name name);
080
081    /**
082     * @deprecated since 11.1 use {@link #delete(Name)} instead
083     */
084    @Deprecated(since = "11.1")
085    default boolean delete(String name) {
086        return delete(Name.ofUrn(name));
087    }
088
089    /**
090     * Returns the number of partition of a Log.
091     *
092     * @since 11.1
093     */
094    int size(Name name);
095
096    /**
097     * Returns the number of partition of a Log.
098     *
099     * @since 10.2
100     * @deprecated since 11.1 use {@link #size(Name)} instead
101     */
102    @Deprecated(since = "11.1")
103    default int size(String name) {
104        return size(Name.ofUrn(name));
105    }
106
107    /**
108     * Gets an appender for the Log named {@code name}, uses {@code codec} to encode records. An appender is thread
109     * safe.
110     *
111     * @since 10.2
112     */
113    <M extends Externalizable> LogAppender<M> getAppender(Name name, Codec<M> codec);
114
115    /**
116     * @deprecated since 11.1 use {@link #getAppender(Name, Codec)} instead
117     */
118    @Deprecated(since = "11.1")
119    default <M extends Externalizable> LogAppender<M> getAppender(String name, Codec<M> codec) {
120        return getAppender(Name.ofUrn(name), codec);
121    }
122
123    /**
124     * Gets an appender for the Log named {@code name}, uses an already defined codec or the legacy encoding to encode
125     * records. An appender is thread safe.
126     *
127     * @since 11.1
128     */
129    default <M extends Externalizable> LogAppender<M> getAppender(Name name) {
130        return getAppender(name, NO_CODEC);
131    }
132
133    /**
134     * Gets an appender for the Log named {@code name}, uses an already defined codec or the legacy encoding to encode
135     * records. An appender is thread safe.
136     *
137     * @since 10.2
138     * @deprecated since 11.1 use {@link #getAppender(Name)} instead
139     */
140    @Deprecated(since = "11.1")
141    default <M extends Externalizable> LogAppender<M> getAppender(String name) {
142        return getAppender(name, NO_CODEC);
143    }
144
145    /**
146     * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Uses {@code codec} to
147     * decode records. Note that {@code partitions} can be from different Logs. A tailer is NOT thread safe.
148     *
149     * @since 11.1
150     */
151    <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions,
152            Codec<M> codec);
153
154    /**
155     * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Uses {@code codec} to
156     * decode records. Note that {@code partitions} can be from different Logs. A tailer is NOT thread safe.
157     *
158     * @since 10.2
159     * @deprecated since 11.1 use {@link #createTailer(Name, Name, Codec)} (Name)} instead
160     */
161    @Deprecated(since = "11.1")
162    default <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions,
163            Codec<M> codec) {
164        return createTailer(Name.ofUrn(group), partitions, codec);
165    }
166
167    /**
168     * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that
169     * {@code partitions} can be from different Logs. Reads records using the legacy decoder. A tailer is NOT thread
170     * safe.
171     *
172     * @since 11.1
173     */
174    default <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions) {
175        return createTailer(group, partitions, NO_CODEC);
176    }
177
178    /**
179     * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that
180     * {@code partitions} can be from different Logs. Reads records using the legacy decoder. A tailer is NOT thread
181     * safe.
182     *
183     * @since 10.2
184     * @deprecated since 11.1 use {@link #createTailer(Name, Collection) (Name)} instead
185     */
186    @Deprecated(since = "11.1")
187    default <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) {
188        return createTailer(Name.ofUrn(group), partitions);
189    }
190
191    /**
192     * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Reads records using the
193     * legacy decoder. A tailer is NOT thread safe.
194     *
195     * @since 11.1
196     */
197    default <M extends Externalizable> LogTailer<M> createTailer(Name group, LogPartition partition) {
198        return createTailer(group, partition, NO_CODEC);
199    }
200
201    /**
202     * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Reads records using the
203     * legacy decoder. A tailer is NOT thread safe.
204     *
205     * @since 10.2
206     * @deprecated since 11.1 use {@link #createTailer(Name, Collection) (Name)} instead
207     */
208    @Deprecated(since = "11.1")
209    default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) {
210        return createTailer(Name.ofUrn(group), partition);
211    }
212
213    /**
214     * Creates a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. Reads records using
215     * the legacy decoder. A tailer is NOT thread safe.
216     *
217     * @since 11.1
218     */
219    default <M extends Externalizable> LogTailer<M> createTailer(Name group, Name name) {
220        return createTailer(group, name, NO_CODEC);
221    }
222
223    /**
224     * Creates a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. Reads records using
225     * the legacy decoder. A tailer is NOT thread safe.
226     *
227     * @since 10.2
228     * @deprecated since 11.1 use {@link #createTailer(Name, Name)} (Name)} instead
229     */
230    @Deprecated(since = "11.1")
231    default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) {
232        return createTailer(Name.ofUrn(group), Name.ofUrn(name));
233    }
234
235    /**
236     * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Use an explicit codec to
237     * decode records. A tailer is NOT thread safe.
238     *
239     * @since 11.1
240     */
241    default <M extends Externalizable> LogTailer<M> createTailer(Name group, LogPartition partition, Codec<M> codec) {
242        return createTailer(group, Collections.singletonList(partition), codec);
243    }
244
245    /**
246     * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Use an explicit codec to
247     * decode records. A tailer is NOT thread safe.
248     *
249     * @since 10.2
250     * @deprecated since 11.1 use {@link #createTailer(Name, LogPartition, Codec)} instead
251     */
252    @Deprecated(since = "11.1")
253    default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition, Codec<M> codec) {
254        return createTailer(Name.ofUrn(group), Collections.singletonList(partition), codec);
255    }
256
257    /**
258     * Creates a tailer for a consumer {@code group} and assigns all {@code partitions} of the Log. Uses {@code codec}
259     * to decode records. A tailer is NOT thread safe.
260     *
261     * @since 11.1
262     */
263    default <M extends Externalizable> LogTailer<M> createTailer(Name group, Name name, Codec<M> codec) {
264        int partitions = size(name);
265        if (partitions <= 0) {
266            throw new IllegalArgumentException("Log name: " + name + " not found");
267        }
268        return createTailer(group,
269                IntStream.range(0, partitions)
270                         .boxed()
271                         .map(partition -> new LogPartition(name, partition))
272                         .collect(Collectors.toList()),
273                codec);
274    }
275
276    /**
277     * Creates a tailer for a consumer {@code group} and assigns all {@code partitions} of the Log. Uses {@code codec}
278     * to decode records. A tailer is NOT thread safe.
279     *
280     * @since 10.2
281     * @deprecated since 11.1 use {@link #createTailer(Name, Name, Codec)} instead
282     */
283    @Deprecated(since = "11.1")
284    default <M extends Externalizable> LogTailer<M> createTailer(String group, String name, Codec<M> codec) {
285        return createTailer(Name.ofUrn(group), Name.ofUrn(name), codec);
286    }
287
288    /**
289     * Returns {@code true} if the Log {@link #subscribe} method is supported.
290     */
291    boolean supportSubscribe();
292
293    /**
294     * Creates a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done
295     * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called
296     * a rebalancing. A listener can be used to be notified on assignment changes. Uses {@code codec} to decode records.
297     *
298     * @since 11.1
299     * @implSpec You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}.
300     * @implNote A tailer is NOT thread safe.
301     */
302    <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names,
303            RebalanceListener listener, Codec<M> codec);
304
305    /**
306     * Creates a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done
307     * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called
308     * a rebalancing. A listener can be used to be notified on assignment changes. Uses {@code codec} to decode records.
309     *
310     * @since 10.2
311     * @deprecated since 11.1 use {@link #subscribe(Name, Collection, RebalanceListener, Codec)} instead
312     * @implSpec You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}.
313     * @implNote A tailer is NOT thread safe.
314     */
315    @Deprecated(since = "11.1")
316    default <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
317            RebalanceListener listener, Codec<M> codec) {
318        return subscribe(Name.ofUrn(group), names.stream().map(Name::ofUrn).collect(Collectors.toList()), listener,
319                codec);
320    }
321
322    // @since 11.1
323    default <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names,
324            RebalanceListener listener) {
325        return subscribe(group, names, listener, NO_CODEC);
326    }
327
328    /**
329     * @since 10.2
330     * @deprecated since 11.1 use {@link #subscribe(Name, Collection, RebalanceListener)} instead
331     */
332    @Deprecated(since = "11.1")
333    default <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
334            RebalanceListener listener) {
335        return subscribe(group, names, listener, NO_CODEC);
336    }
337
338    /**
339     * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered,
340     * for instance index 0 is lag for partition 0.
341     *
342     * @since 11.1
343     */
344    List<LogLag> getLagPerPartition(Name name, Name group);
345
346    /**
347     * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered,
348     * for instance index 0 is lag for partition 0.
349     *
350     * @since 10.2
351     * @deprecated since 11.1 use {@link #getLagPerPartition(Name, Name)} instead
352     */
353    @Deprecated(since = "11.1")
354    default List<LogLag> getLagPerPartition(String name, String group) {
355        return getLagPerPartition(Name.ofUrn(name), Name.ofUrn(group));
356    }
357
358    /**
359     * Returns the lag between consumer {@code group} and producers for a Log.
360     *
361     * @since 11.1
362     */
363    default LogLag getLag(Name name, Name group) {
364        return LogLag.of(getLagPerPartition(name, group));
365    }
366
367    /**
368     * Returns the lag between consumer {@code group} and producers for a Log.
369     *
370     * @since 10.2
371     * @deprecated since 11.1 use {@link #getLag(Name, Name)} instead
372     */
373    @Deprecated(since = "11.1")
374    default LogLag getLag(String name, String group) {
375        return getLag(Name.ofUrn(name), Name.ofUrn(group));
376    }
377
378    /**
379     * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This
380     * requires to read one record per partition so it costs more than {@link #getLagPerPartition(Name, Name)}.
381     * <p>
382     * Two functions need to be provided to extract the timestamp and a key from a record.
383     *
384     * @since 11.1
385     */
386    <M extends Externalizable> List<Latency> getLatencyPerPartition(Name name, Name group, Codec<M> codec,
387            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor);
388
389    /**
390     * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This
391     * requires to read one record per partition so it costs more than {@link #getLagPerPartition(Name, Name)}.
392     * <p>
393     * Two functions need to be provided to extract the timestamp and a key from a record.
394     *
395     * @since 10.2
396     * @deprecated since 11.1 use {@link #getLatencyPerPartition(Name, Name, Codec, Function, Function)} instead
397     */
398    @Deprecated(since = "11.1")
399    default <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, Codec<M> codec,
400            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
401        return getLatencyPerPartition(Name.ofUrn(name), Name.ofUrn(group), codec, timestampExtractor, keyExtractor);
402    }
403
404    /**
405     * Returns the latency between consumer {@code group} and producers for a Log.
406     *
407     * @since 11.1
408     */
409    default <M extends Externalizable> Latency getLatency(Name name, Name group, Codec<M> codec,
410            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
411        return Latency.of(getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor));
412    }
413
414    /**
415     * Returns the latency between consumer {@code group} and producers for a Log.
416     *
417     * @since 10.2
418     * @deprecated since 11.1 use {@link #getLatencyPerPartition(Name, Name, Codec, Function, Function)} instead
419     */
420    @Deprecated(since = "11.1")
421    default <M extends Externalizable> Latency getLatency(String name, String group, Codec<M> codec,
422                                                          Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
423        return Latency.of(getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor));
424    }
425
426    /**
427     * Returns all the Log names.
428     *
429     * @deprecated since 11.1 use {@link #listAllNames()} instead
430     */
431    @Deprecated(since = "11.1")
432    default List<String> listAll() {
433        return listAllNames().stream().map(Name::getUrn).collect(Collectors.toList());
434    }
435
436    /**
437     * Returns all the Log names.
438     *
439     * @since 11.1
440     */
441    List<Name> listAllNames();
442
443    /**
444     * List the consumer groups for a Log.
445     *
446     * @since 11.1
447     * @implNote Note that for Kafka it returns only consumers that use the subscribe API.
448     */
449    List<Name> listConsumerGroups(Name name);
450
451    /**
452     * List the consumer groups for a Log.
453     *
454     * @since 10.2
455     * @deprecated since 11.1 use {@link #listConsumerGroups(Name)} instead
456     * @implNote Note that for Kafka it returns only consumers that use the subscribe API.
457     */
458    @Deprecated(since = "11.1")
459    default List<String> listConsumerGroups(String name) {
460        return listConsumerGroups(Name.ofUrn(name)).stream().map(Name::getUrn).collect(Collectors.toList());
461    }
462
463    @Override
464    void close();
465}