001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import java.io.Externalizable;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.List;
025import java.util.function.Function;
026import java.util.stream.Collectors;
027import java.util.stream.IntStream;
028
029/**
030 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and
031 * tailers.
032 *
033 * @since 9.3
034 */
035public interface LogManager extends AutoCloseable {
036
037    /**
038     * Returns {@code true} if a Log with this {@code name} exists.
039     */
040    boolean exists(String name);
041
042    /**
043     * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been
044     * created.
045     */
046    boolean createIfNotExists(String name, int size);
047
048    /**
049     * Try to delete a Log. Returns true if successfully deleted, might not be possible depending on the implementation.
050     */
051    boolean delete(String name);
052
053    /**
054     * Get an appender for the Log named {@code name}. An appender is thread safe.
055     */
056    <M extends Externalizable> LogAppender<M> getAppender(String name);
057
058    /**
059     * Create a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that {@code partitions}
060     * can be from different Logs. A tailer is NOT thread safe.
061     */
062    <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions);
063
064    /**
065     * Create a tailer for a consumer {@code group} and assign a single {@code partition}. A tailer is NOT thread safe.
066     */
067    default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) {
068        return createTailer(group, Collections.singletonList(partition));
069    }
070
071    /**
072     * Create a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. A tailer is NOT thread
073     * safe.
074     */
075    default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) {
076        int size = getAppender(name).size();
077        return createTailer(group,
078                IntStream.range(0, size).boxed().map(partition -> new LogPartition(name, partition)).collect(
079                        Collectors.toList()));
080    }
081
082    /**
083     * Returns {@code true} if the Log {@link #subscribe} method is supported.
084     */
085    boolean supportSubscribe();
086
087    /**
088     * Create a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done
089     * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called
090     * a rebalancing. A listener can be used to be notified on assignment changes.
091     * <p/>
092     * A tailer is NOT thread safe.
093     * <p/>
094     * You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}.
095     */
096    <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
097            RebalanceListener listener);
098
099    /**
100     * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered,
101     * for instance index 0 is lag for partition 0.
102     */
103    List<LogLag> getLagPerPartition(String name, String group);
104
105    /**
106     * Returns the lag between consumer {@code group} and producers for a Log.
107     */
108    default LogLag getLag(String name, String group) {
109        return LogLag.of(getLagPerPartition(name, group));
110    }
111
112    /**
113     * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This
114     * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}.
115     * <br/>
116     * Two functions need to be provided to extract the timestamp and a key from a record.
117     *
118     * @since 10.1
119     */
120    <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group,
121            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor);
122
123    /**
124     * Returns the latency between consumer {@code group} and producers for a Log.
125     *
126     * @since 10.1
127     */
128    default <M extends Externalizable> Latency getLatency(String name, String group,
129            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
130        return Latency.of(getLatencyPerPartition(name, group, timestampExtractor, keyExtractor));
131    }
132
133    /**
134     * Returns all the Log names.
135     */
136    List<String> listAll();
137
138    /**
139     * List the consumer groups for a Log.<br/>
140     * Note that for Kafka it returns only consumers that use the subscribe API.
141     */
142    List<String> listConsumerGroups(String name);
143
144    @Override
145    void close();
146}