001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.List; 027import java.util.function.Function; 028import java.util.stream.Collectors; 029import java.util.stream.IntStream; 030 031import org.nuxeo.lib.stream.codec.Codec; 032 033/** 034 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and 035 * tailers. 036 * 037 * @since 9.3 038 */ 039@SuppressWarnings("unchecked") 040public interface LogManager extends AutoCloseable { 041 042 /** 043 * Returns {@code true} if a Log with this {@code name} exists. 044 * 045 * @since 11.1 046 */ 047 boolean exists(Name name); 048 049 /** 050 * @deprecated since 11.1 use {@link #exists(Name)} instead 051 */ 052 @Deprecated(since = "11.1") 053 default boolean exists(String name) { 054 return exists(Name.ofUrn(name)); 055 } 056 057 /** 058 * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been 059 * created. 060 * 061 * @since 11.1 062 */ 063 boolean createIfNotExists(Name name, int size); 064 065 /** 066 * @deprecated since 11.1 use {@link #createIfNotExists(Name, int)} instead 067 */ 068 @Deprecated(since = "11.1") 069 default boolean createIfNotExists(String name, int size) { 070 return createIfNotExists(Name.ofUrn(name), size); 071 } 072 073 /** 074 * Tries to delete a Log. Returns true if successfully deleted, might not be possible depending on the 075 * implementation. 076 * 077 * @since 11.1 078 */ 079 boolean delete(Name name); 080 081 /** 082 * @deprecated since 11.1 use {@link #delete(Name)} instead 083 */ 084 @Deprecated(since = "11.1") 085 default boolean delete(String name) { 086 return delete(Name.ofUrn(name)); 087 } 088 089 /** 090 * Returns the number of partition of a Log. 091 * 092 * @since 11.1 093 */ 094 int size(Name name); 095 096 /** 097 * Returns the number of partition of a Log. 098 * 099 * @since 10.2 100 * @deprecated since 11.1 use {@link #size(Name)} instead 101 */ 102 @Deprecated(since = "11.1") 103 default int size(String name) { 104 return size(Name.ofUrn(name)); 105 } 106 107 /** 108 * Gets an appender for the Log named {@code name}, uses {@code codec} to encode records. An appender is thread 109 * safe. 110 * 111 * @since 10.2 112 */ 113 <M extends Externalizable> LogAppender<M> getAppender(Name name, Codec<M> codec); 114 115 /** 116 * @deprecated since 11.1 use {@link #getAppender(Name, Codec)} instead 117 */ 118 @Deprecated(since = "11.1") 119 default <M extends Externalizable> LogAppender<M> getAppender(String name, Codec<M> codec) { 120 return getAppender(Name.ofUrn(name), codec); 121 } 122 123 /** 124 * Gets an appender for the Log named {@code name}, uses an already defined codec or the legacy encoding to encode 125 * records. An appender is thread safe. 126 * 127 * @since 11.1 128 */ 129 default <M extends Externalizable> LogAppender<M> getAppender(Name name) { 130 return getAppender(name, NO_CODEC); 131 } 132 133 /** 134 * Gets an appender for the Log named {@code name}, uses an already defined codec or the legacy encoding to encode 135 * records. An appender is thread safe. 136 * 137 * @since 10.2 138 * @deprecated since 11.1 use {@link #getAppender(Name)} instead 139 */ 140 @Deprecated(since = "11.1") 141 default <M extends Externalizable> LogAppender<M> getAppender(String name) { 142 return getAppender(name, NO_CODEC); 143 } 144 145 /** 146 * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Uses {@code codec} to 147 * decode records. Note that {@code partitions} can be from different Logs. A tailer is NOT thread safe. 148 * 149 * @since 11.1 150 */ 151 <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions, 152 Codec<M> codec); 153 154 /** 155 * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Uses {@code codec} to 156 * decode records. Note that {@code partitions} can be from different Logs. A tailer is NOT thread safe. 157 * 158 * @since 10.2 159 * @deprecated since 11.1 use {@link #createTailer(Name, Name, Codec)} (Name)} instead 160 */ 161 @Deprecated(since = "11.1") 162 default <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions, 163 Codec<M> codec) { 164 return createTailer(Name.ofUrn(group), partitions, codec); 165 } 166 167 /** 168 * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that 169 * {@code partitions} can be from different Logs. Reads records using the legacy decoder. A tailer is NOT thread 170 * safe. 171 * 172 * @since 11.1 173 */ 174 default <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions) { 175 return createTailer(group, partitions, NO_CODEC); 176 } 177 178 /** 179 * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that 180 * {@code partitions} can be from different Logs. Reads records using the legacy decoder. A tailer is NOT thread 181 * safe. 182 * 183 * @since 10.2 184 * @deprecated since 11.1 use {@link #createTailer(Name, Collection) (Name)} instead 185 */ 186 @Deprecated(since = "11.1") 187 default <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) { 188 return createTailer(Name.ofUrn(group), partitions); 189 } 190 191 /** 192 * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Reads records using the 193 * legacy decoder. A tailer is NOT thread safe. 194 * 195 * @since 11.1 196 */ 197 default <M extends Externalizable> LogTailer<M> createTailer(Name group, LogPartition partition) { 198 return createTailer(group, partition, NO_CODEC); 199 } 200 201 /** 202 * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Reads records using the 203 * legacy decoder. A tailer is NOT thread safe. 204 * 205 * @since 10.2 206 * @deprecated since 11.1 use {@link #createTailer(Name, Collection) (Name)} instead 207 */ 208 @Deprecated(since = "11.1") 209 default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) { 210 return createTailer(Name.ofUrn(group), partition); 211 } 212 213 /** 214 * Creates a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. Reads records using 215 * the legacy decoder. A tailer is NOT thread safe. 216 * 217 * @since 11.1 218 */ 219 default <M extends Externalizable> LogTailer<M> createTailer(Name group, Name name) { 220 return createTailer(group, name, NO_CODEC); 221 } 222 223 /** 224 * Creates a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. Reads records using 225 * the legacy decoder. A tailer is NOT thread safe. 226 * 227 * @since 10.2 228 * @deprecated since 11.1 use {@link #createTailer(Name, Name)} (Name)} instead 229 */ 230 @Deprecated(since = "11.1") 231 default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) { 232 return createTailer(Name.ofUrn(group), Name.ofUrn(name)); 233 } 234 235 /** 236 * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Use an explicit codec to 237 * decode records. A tailer is NOT thread safe. 238 * 239 * @since 11.1 240 */ 241 default <M extends Externalizable> LogTailer<M> createTailer(Name group, LogPartition partition, Codec<M> codec) { 242 return createTailer(group, Collections.singletonList(partition), codec); 243 } 244 245 /** 246 * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Use an explicit codec to 247 * decode records. A tailer is NOT thread safe. 248 * 249 * @since 10.2 250 * @deprecated since 11.1 use {@link #createTailer(Name, LogPartition, Codec)} instead 251 */ 252 @Deprecated(since = "11.1") 253 default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition, Codec<M> codec) { 254 return createTailer(Name.ofUrn(group), Collections.singletonList(partition), codec); 255 } 256 257 /** 258 * Creates a tailer for a consumer {@code group} and assigns all {@code partitions} of the Log. Uses {@code codec} 259 * to decode records. A tailer is NOT thread safe. 260 * 261 * @since 11.1 262 */ 263 default <M extends Externalizable> LogTailer<M> createTailer(Name group, Name name, Codec<M> codec) { 264 int partitions = size(name); 265 if (partitions <= 0) { 266 throw new IllegalArgumentException("Log name: " + name + " not found"); 267 } 268 return createTailer(group, 269 IntStream.range(0, partitions) 270 .boxed() 271 .map(partition -> new LogPartition(name, partition)) 272 .collect(Collectors.toList()), 273 codec); 274 } 275 276 /** 277 * Creates a tailer for a consumer {@code group} and assigns all {@code partitions} of the Log. Uses {@code codec} 278 * to decode records. A tailer is NOT thread safe. 279 * 280 * @since 10.2 281 * @deprecated since 11.1 use {@link #createTailer(Name, Name, Codec)} instead 282 */ 283 @Deprecated(since = "11.1") 284 default <M extends Externalizable> LogTailer<M> createTailer(String group, String name, Codec<M> codec) { 285 return createTailer(Name.ofUrn(group), Name.ofUrn(name), codec); 286 } 287 288 /** 289 * Returns {@code true} if the Log {@link #subscribe} method is supported. 290 */ 291 boolean supportSubscribe(); 292 293 /** 294 * Creates a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done 295 * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called 296 * a rebalancing. A listener can be used to be notified on assignment changes. Uses {@code codec} to decode records. 297 * 298 * @since 11.1 299 * @implSpec You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}. 300 * @implNote A tailer is NOT thread safe. 301 */ 302 <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names, 303 RebalanceListener listener, Codec<M> codec); 304 305 /** 306 * Creates a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done 307 * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called 308 * a rebalancing. A listener can be used to be notified on assignment changes. Uses {@code codec} to decode records. 309 * 310 * @since 10.2 311 * @deprecated since 11.1 use {@link #subscribe(Name, Collection, RebalanceListener, Codec)} instead 312 * @implSpec You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}. 313 * @implNote A tailer is NOT thread safe. 314 */ 315 @Deprecated(since = "11.1") 316 default <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 317 RebalanceListener listener, Codec<M> codec) { 318 return subscribe(Name.ofUrn(group), names.stream().map(Name::ofUrn).collect(Collectors.toList()), listener, 319 codec); 320 } 321 322 // @since 11.1 323 default <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names, 324 RebalanceListener listener) { 325 return subscribe(group, names, listener, NO_CODEC); 326 } 327 328 /** 329 * @since 10.2 330 * @deprecated since 11.1 use {@link #subscribe(Name, Collection, RebalanceListener)} instead 331 */ 332 @Deprecated(since = "11.1") 333 default <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 334 RebalanceListener listener) { 335 return subscribe(group, names, listener, NO_CODEC); 336 } 337 338 /** 339 * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered, 340 * for instance index 0 is lag for partition 0. 341 * 342 * @since 11.1 343 */ 344 List<LogLag> getLagPerPartition(Name name, Name group); 345 346 /** 347 * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered, 348 * for instance index 0 is lag for partition 0. 349 * 350 * @since 10.2 351 * @deprecated since 11.1 use {@link #getLagPerPartition(Name, Name)} instead 352 */ 353 @Deprecated(since = "11.1") 354 default List<LogLag> getLagPerPartition(String name, String group) { 355 return getLagPerPartition(Name.ofUrn(name), Name.ofUrn(group)); 356 } 357 358 /** 359 * Returns the lag between consumer {@code group} and producers for a Log. 360 * 361 * @since 11.1 362 */ 363 default LogLag getLag(Name name, Name group) { 364 return LogLag.of(getLagPerPartition(name, group)); 365 } 366 367 /** 368 * Returns the lag between consumer {@code group} and producers for a Log. 369 * 370 * @since 10.2 371 * @deprecated since 11.1 use {@link #getLag(Name, Name)} instead 372 */ 373 @Deprecated(since = "11.1") 374 default LogLag getLag(String name, String group) { 375 return getLag(Name.ofUrn(name), Name.ofUrn(group)); 376 } 377 378 /** 379 * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This 380 * requires to read one record per partition so it costs more than {@link #getLagPerPartition(Name, Name)}. 381 * <p> 382 * Two functions need to be provided to extract the timestamp and a key from a record. 383 * 384 * @since 11.1 385 */ 386 <M extends Externalizable> List<Latency> getLatencyPerPartition(Name name, Name group, Codec<M> codec, 387 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor); 388 389 /** 390 * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This 391 * requires to read one record per partition so it costs more than {@link #getLagPerPartition(Name, Name)}. 392 * <p> 393 * Two functions need to be provided to extract the timestamp and a key from a record. 394 * 395 * @since 10.2 396 * @deprecated since 11.1 use {@link #getLatencyPerPartition(Name, Name, Codec, Function, Function)} instead 397 */ 398 @Deprecated(since = "11.1") 399 default <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, Codec<M> codec, 400 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 401 return getLatencyPerPartition(Name.ofUrn(name), Name.ofUrn(group), codec, timestampExtractor, keyExtractor); 402 } 403 404 /** 405 * Returns the latency between consumer {@code group} and producers for a Log. 406 * 407 * @since 11.1 408 */ 409 default <M extends Externalizable> Latency getLatency(Name name, Name group, Codec<M> codec, 410 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 411 return Latency.of(getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor)); 412 } 413 414 /** 415 * Returns the latency between consumer {@code group} and producers for a Log. 416 * 417 * @since 10.2 418 * @deprecated since 11.1 use {@link #getLatencyPerPartition(Name, Name, Codec, Function, Function)} instead 419 */ 420 @Deprecated(since = "11.1") 421 default <M extends Externalizable> Latency getLatency(String name, String group, Codec<M> codec, 422 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 423 return Latency.of(getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor)); 424 } 425 426 /** 427 * Returns all the Log names. 428 * 429 * @deprecated since 11.1 use {@link #listAllNames()} instead 430 */ 431 @Deprecated(since = "11.1") 432 default List<String> listAll() { 433 return listAllNames().stream().map(Name::getUrn).collect(Collectors.toList()); 434 } 435 436 /** 437 * Returns all the Log names. 438 * 439 * @since 11.1 440 */ 441 List<Name> listAllNames(); 442 443 /** 444 * List the consumer groups for a Log. 445 * 446 * @since 11.1 447 * @implNote Note that for Kafka it returns only consumers that use the subscribe API. 448 */ 449 List<Name> listConsumerGroups(Name name); 450 451 /** 452 * List the consumer groups for a Log. 453 * 454 * @since 10.2 455 * @deprecated since 11.1 use {@link #listConsumerGroups(Name)} instead 456 * @implNote Note that for Kafka it returns only consumers that use the subscribe API. 457 */ 458 @Deprecated(since = "11.1") 459 default List<String> listConsumerGroups(String name) { 460 return listConsumerGroups(Name.ofUrn(name)).stream().map(Name::getUrn).collect(Collectors.toList()); 461 } 462 463 @Override 464 void close(); 465}