001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log; 020 021import java.io.Externalizable; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.List; 025import java.util.function.Function; 026import java.util.stream.Collectors; 027import java.util.stream.IntStream; 028 029/** 030 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and 031 * tailers. 032 * 033 * @since 9.3 034 */ 035public interface LogManager extends AutoCloseable { 036 037 /** 038 * Returns {@code true} if a Log with this {@code name} exists. 039 */ 040 boolean exists(String name); 041 042 /** 043 * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been 044 * created. 045 */ 046 boolean createIfNotExists(String name, int size); 047 048 /** 049 * Try to delete a Log. Returns true if successfully deleted, might not be possible depending on the implementation. 050 */ 051 boolean delete(String name); 052 053 /** 054 * Get an appender for the Log named {@code name}. An appender is thread safe. 055 */ 056 <M extends Externalizable> LogAppender<M> getAppender(String name); 057 058 /** 059 * Create a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that {@code partitions} 060 * can be from different Logs. A tailer is NOT thread safe. 061 */ 062 <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions); 063 064 /** 065 * Create a tailer for a consumer {@code group} and assign a single {@code partition}. A tailer is NOT thread safe. 066 */ 067 default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) { 068 return createTailer(group, Collections.singletonList(partition)); 069 } 070 071 /** 072 * Create a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. A tailer is NOT thread 073 * safe. 074 */ 075 default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) { 076 int size = getAppender(name).size(); 077 return createTailer(group, 078 IntStream.range(0, size).boxed().map(partition -> new LogPartition(name, partition)).collect( 079 Collectors.toList())); 080 } 081 082 /** 083 * Returns {@code true} if the Log {@link #subscribe} method is supported. 084 */ 085 boolean supportSubscribe(); 086 087 /** 088 * Create a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done 089 * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called 090 * a rebalancing. A listener can be used to be notified on assignment changes. 091 * <p/> 092 * A tailer is NOT thread safe. 093 * <p/> 094 * You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}. 095 */ 096 <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 097 RebalanceListener listener); 098 099 /** 100 * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered, 101 * for instance index 0 is lag for partition 0. 102 */ 103 List<LogLag> getLagPerPartition(String name, String group); 104 105 /** 106 * Returns the lag between consumer {@code group} and producers for a Log. 107 */ 108 default LogLag getLag(String name, String group) { 109 return LogLag.of(getLagPerPartition(name, group)); 110 } 111 112 /** 113 * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This 114 * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}. 115 * <br/> 116 * Two functions need to be provided to extract the timestamp and a key from a record. 117 * 118 * @since 10.1 119 */ 120 <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, 121 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor); 122 123 /** 124 * Returns the latency between consumer {@code group} and producers for a Log. 125 * 126 * @since 10.1 127 */ 128 default <M extends Externalizable> Latency getLatency(String name, String group, 129 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 130 return Latency.of(getLatencyPerPartition(name, group, timestampExtractor, keyExtractor)); 131 } 132 133 /** 134 * Returns all the Log names. 135 */ 136 List<String> listAll(); 137 138 /** 139 * List the consumer groups for a Log.<br/> 140 * Note that for Kafka it returns only consumers that use the subscribe API. 141 */ 142 List<String> listConsumerGroups(String name); 143 144 @Override 145 void close(); 146}