001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import static java.lang.Math.max;
022import static java.lang.Math.min;
023
024import java.io.Externalizable;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.List;
028import java.util.stream.Collectors;
029import java.util.stream.IntStream;
030
031/**
032 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and
033 * tailers.
034 *
035 * @since 9.3
036 */
037public interface LogManager extends AutoCloseable {
038
039    /**
040     * Returns {@code true} if a Log with this {@code name} exists.
041     */
042    boolean exists(String name);
043
044    /**
045     * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been
046     * created.
047     */
048    boolean createIfNotExists(String name, int size);
049
050    /**
051     * Try to delete a Log. Returns true if successfully deleted, might not be possible depending on the implementation.
052     */
053    boolean delete(String name);
054
055    /**
056     * Get an appender for the Log named {@code name}. An appender is thread safe.
057     */
058    <M extends Externalizable> LogAppender<M> getAppender(String name);
059
060    /**
061     * Create a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that {@code partitions}
062     * can be from different Logs. A tailer is NOT thread safe.
063     */
064    <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions);
065
066    /**
067     * Create a tailer for a consumer {@code group} and assign a single {@code partition}. A tailer is NOT thread safe.
068     */
069    default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) {
070        return createTailer(group, Collections.singletonList(partition));
071    }
072
073    /**
074     * Create a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. A tailer is NOT thread
075     * safe.
076     */
077    default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) {
078        int size = getAppender(name).size();
079        return createTailer(group,
080                IntStream.range(0, size).boxed().map(partition -> new LogPartition(name, partition)).collect(
081                        Collectors.toList()));
082    }
083
084    /**
085     * Returns {@code true} if the Log {@link #subscribe} method is supported.
086     */
087    boolean supportSubscribe();
088
089    /**
090     * Create a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done
091     * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called
092     * a rebalancing. A listener can be used to be notified on assignment changes.
093     * <p/>
094     * A tailer is NOT thread safe.
095     * <p/>
096     * You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}.
097     */
098    <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
099            RebalanceListener listener);
100
101    /**
102     * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered,
103     * for instance index 0 is lag for partition 0.
104     */
105    List<LogLag> getLagPerPartition(String name, String group);
106
107    /**
108     * Returns the lag between consumer {@code group} and producers for a Log.
109     */
110    default LogLag getLag(String name, String group) {
111        final long[] end = { 0 };
112        final long[] pos = { Long.MAX_VALUE };
113        final long[] lag = { 0 };
114        final long[] endMessages = { 0 };
115        getLagPerPartition(name, group).forEach(item -> {
116            if (item.lowerOffset() > 0) {
117                pos[0] = min(pos[0], item.lowerOffset());
118            }
119            end[0] = max(end[0], item.upperOffset());
120            endMessages[0] += item.upper();
121            lag[0] += item.lag();
122        });
123        return new LogLag(pos[0], end[0], lag[0], endMessages[0]);
124    }
125
126    /**
127     * Returns all the Log names.
128     */
129    List<String> listAll();
130
131    /**
132     * List the consumer groups for a Log.
133     */
134    List<String> listConsumerGroups(String name);
135
136    @Override
137    void close();
138}