001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.List; 027import java.util.function.Function; 028import java.util.stream.Collectors; 029import java.util.stream.IntStream; 030 031import org.nuxeo.lib.stream.codec.Codec; 032 033/** 034 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and 035 * tailers. 036 * 037 * @since 9.3 038 */ 039@SuppressWarnings("unchecked") 040public interface LogManager extends AutoCloseable { 041 042 /** 043 * Returns {@code true} if a Log with this {@code name} exists. 044 */ 045 boolean exists(String name); 046 047 /** 048 * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been 049 * created. 050 */ 051 boolean createIfNotExists(String name, int size); 052 053 /** 054 * Tries to delete a Log. Returns true if successfully deleted, might not be possible depending on the 055 * implementation. 056 */ 057 boolean delete(String name); 058 059 /** 060 * Returns the number of partition of a Log. 061 * 062 * @since 10.2 063 */ 064 int size(String name); 065 066 /** 067 * Gets an appender for the Log named {@code name}, uses {@code codec} to encode records. An appender is thread 068 * safe. 069 * 070 * @since 10.2 071 */ 072 <M extends Externalizable> LogAppender<M> getAppender(String name, Codec<M> codec); 073 074 /** 075 * Gets an appender for the Log named {@code name}, uses an already defined codec or the legacy encoding to encode 076 * records. An appender is thread safe. 077 */ 078 default <M extends Externalizable> LogAppender<M> getAppender(String name) { 079 return getAppender(name, NO_CODEC); 080 } 081 082 /** 083 * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Uses {@code codec} to 084 * decode records. Note that {@code partitions} can be from different Logs. A tailer is NOT thread safe. 085 * 086 * @since 10.2 087 */ 088 <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions, 089 Codec<M> codec); 090 091 /** 092 * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that 093 * {@code partitions} can be from different Logs. Reads records using the legacy decoder. A tailer is NOT thread 094 * safe. 095 */ 096 default <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) { 097 return createTailer(group, partitions, NO_CODEC); 098 } 099 100 /** 101 * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Reads records using the 102 * legacy decoder. A tailer is NOT thread safe. 103 */ 104 default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) { 105 return createTailer(group, partition, NO_CODEC); 106 } 107 108 /** 109 * Creates a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. Reads records using 110 * the legacy decoder. A tailer is NOT thread safe. 111 */ 112 default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) { 113 return createTailer(group, name, NO_CODEC); 114 } 115 116 /** 117 * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Use an explicit codec to 118 * decode records. A tailer is NOT thread safe. 119 * 120 * @since 10.2 121 */ 122 default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition, Codec<M> codec) { 123 return createTailer(group, Collections.singletonList(partition), codec); 124 } 125 126 /** 127 * Creates a tailer for a consumer {@code group} and assigns all {@code partitions} of the Log. Uses {@code codec} 128 * to decode records. A tailer is NOT thread safe. 129 * 130 * @since 10.2 131 */ 132 default <M extends Externalizable> LogTailer<M> createTailer(String group, String name, Codec<M> codec) { 133 return createTailer(group, 134 IntStream.range(0, size(name)).boxed().map(partition -> new LogPartition(name, partition)).collect( 135 Collectors.toList()), 136 codec); 137 } 138 139 /** 140 * Returns {@code true} if the Log {@link #subscribe} method is supported. 141 */ 142 boolean supportSubscribe(); 143 144 /** 145 * Creates a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done 146 * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called 147 * a rebalancing. A listener can be used to be notified on assignment changes. Uses {@code codec} to decode records. 148 * <p/> 149 * A tailer is NOT thread safe. 150 * <p/> 151 * You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}. 152 */ 153 <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 154 RebalanceListener listener, Codec<M> codec); 155 156 default <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 157 RebalanceListener listener) { 158 return subscribe(group, names, listener, NO_CODEC); 159 } 160 161 /** 162 * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered, 163 * for instance index 0 is lag for partition 0. 164 */ 165 List<LogLag> getLagPerPartition(String name, String group); 166 167 /** 168 * Returns the lag between consumer {@code group} and producers for a Log. 169 */ 170 default LogLag getLag(String name, String group) { 171 return LogLag.of(getLagPerPartition(name, group)); 172 } 173 174 /** 175 * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This 176 * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}. 177 * <br/> 178 * Two functions need to be provided to extract the timestamp and a key from a record. 179 * 180 * @since 10.2 181 */ 182 <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, Codec<M> codec, 183 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor); 184 185 /** 186 * Returns the latency between consumer {@code group} and producers for a Log. 187 * 188 * @since 10.2 189 */ 190 default <M extends Externalizable> Latency getLatency(String name, String group, Codec<M> codec, 191 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 192 return Latency.of(getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor)); 193 } 194 195 /** 196 * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This 197 * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}. 198 * <br/> 199 * Two functions need to be provided to extract the timestamp and a key from a record. 200 * 201 * @since 10.1 202 * @deprecated 10.2 use {@link #getLatencyPerPartition(String, String, Codec, Function, Function)} instead. 203 */ 204 @Deprecated 205 default <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, 206 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 207 return getLatencyPerPartition(name, group, NO_CODEC, timestampExtractor, keyExtractor); 208 } 209 210 /** 211 * Returns the latency between consumer {@code group} and producers for a Log. 212 * 213 * @since 10.1 214 * @deprecated 10.2 use {@link #getLatency(String, String, Codec, Function, Function)} instead. 215 */ 216 @Deprecated 217 default <M extends Externalizable> Latency getLatency(String name, String group, 218 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 219 return getLatency(name, group, NO_CODEC, timestampExtractor, keyExtractor); 220 } 221 222 /** 223 * Returns all the Log names. 224 */ 225 List<String> listAll(); 226 227 /** 228 * List the consumer groups for a Log.<br/> 229 * Note that for Kafka it returns only consumers that use the subscribe API. 230 */ 231 List<String> listConsumerGroups(String name); 232 233 @Override 234 void close(); 235}