001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.List; 027import java.util.function.Function; 028import java.util.stream.Collectors; 029import java.util.stream.IntStream; 030 031import org.nuxeo.lib.stream.codec.Codec; 032 033/** 034 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and 035 * tailers. 036 * 037 * @since 9.3 038 */ 039@SuppressWarnings("unchecked") 040public interface LogManager extends AutoCloseable { 041 042 /** 043 * Returns {@code true} if a Log with this {@code name} exists. 044 */ 045 boolean exists(String name); 046 047 /** 048 * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been 049 * created. 050 */ 051 boolean createIfNotExists(String name, int size); 052 053 /** 054 * Tries to delete a Log. Returns true if successfully deleted, might not be possible depending on the 055 * implementation. 056 */ 057 boolean delete(String name); 058 059 /** 060 * Returns the number of partition of a Log. 061 * 062 * @since 10.2 063 */ 064 int size(String name); 065 066 /** 067 * Gets an appender for the Log named {@code name}, uses {@code codec} to encode records. An appender is thread 068 * safe. 069 * 070 * @since 10.2 071 */ 072 <M extends Externalizable> LogAppender<M> getAppender(String name, Codec<M> codec); 073 074 /** 075 * Gets an appender for the Log named {@code name}, uses an already defined codec or the legacy encoding to encode 076 * records. An appender is thread safe. 077 */ 078 default <M extends Externalizable> LogAppender<M> getAppender(String name) { 079 return getAppender(name, NO_CODEC); 080 } 081 082 /** 083 * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Uses {@code codec} to 084 * decode records. Note that {@code partitions} can be from different Logs. A tailer is NOT thread safe. 085 * 086 * @since 10.2 087 */ 088 <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions, 089 Codec<M> codec); 090 091 /** 092 * Creates a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that 093 * {@code partitions} can be from different Logs. Reads records using the legacy decoder. A tailer is NOT thread 094 * safe. 095 */ 096 default <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) { 097 return createTailer(group, partitions, NO_CODEC); 098 } 099 100 /** 101 * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Reads records using the 102 * legacy decoder. A tailer is NOT thread safe. 103 */ 104 default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) { 105 return createTailer(group, partition, NO_CODEC); 106 } 107 108 /** 109 * Creates a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. Reads records using 110 * the legacy decoder. A tailer is NOT thread safe. 111 */ 112 default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) { 113 return createTailer(group, name, NO_CODEC); 114 } 115 116 /** 117 * Creates a tailer for a consumer {@code group} and assign a single {@code partition}. Use an explicit codec to 118 * decode records. A tailer is NOT thread safe. 119 * 120 * @since 10.2 121 */ 122 default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition, Codec<M> codec) { 123 return createTailer(group, Collections.singletonList(partition), codec); 124 } 125 126 /** 127 * Creates a tailer for a consumer {@code group} and assigns all {@code partitions} of the Log. Uses {@code codec} 128 * to decode records. A tailer is NOT thread safe. 129 * 130 * @since 10.2 131 */ 132 default <M extends Externalizable> LogTailer<M> createTailer(String group, String name, Codec<M> codec) { 133 int partitions = size(name); 134 if (partitions <= 0) { 135 throw new IllegalArgumentException("Log name: " + name + " not found"); 136 } 137 return createTailer(group, 138 IntStream.range(0, partitions) 139 .boxed() 140 .map(partition -> new LogPartition(name, partition)) 141 .collect(Collectors.toList()), 142 codec); 143 } 144 145 /** 146 * Returns {@code true} if the Log {@link #subscribe} method is supported. 147 */ 148 boolean supportSubscribe(); 149 150 /** 151 * Creates a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done 152 * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called 153 * a rebalancing. A listener can be used to be notified on assignment changes. Uses {@code codec} to decode records. 154 * <p/> 155 * A tailer is NOT thread safe. 156 * <p/> 157 * You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}. 158 */ 159 <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 160 RebalanceListener listener, Codec<M> codec); 161 162 default <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 163 RebalanceListener listener) { 164 return subscribe(group, names, listener, NO_CODEC); 165 } 166 167 /** 168 * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered, 169 * for instance index 0 is lag for partition 0. 170 */ 171 List<LogLag> getLagPerPartition(String name, String group); 172 173 /** 174 * Returns the lag between consumer {@code group} and producers for a Log. 175 */ 176 default LogLag getLag(String name, String group) { 177 return LogLag.of(getLagPerPartition(name, group)); 178 } 179 180 /** 181 * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This 182 * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}. 183 * <br/> 184 * Two functions need to be provided to extract the timestamp and a key from a record. 185 * 186 * @since 10.2 187 */ 188 <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, Codec<M> codec, 189 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor); 190 191 /** 192 * Returns the latency between consumer {@code group} and producers for a Log. 193 * 194 * @since 10.2 195 */ 196 default <M extends Externalizable> Latency getLatency(String name, String group, Codec<M> codec, 197 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 198 return Latency.of(getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor)); 199 } 200 201 /** 202 * Returns the lag with latency. Timestamps used to compute the latencies are extracted from the records. This 203 * requires to read one record per partition so it costs more than {@link #getLagPerPartition(String, String)}. 204 * <br/> 205 * Two functions need to be provided to extract the timestamp and a key from a record. 206 * 207 * @since 10.1 208 * @deprecated 10.2 use {@link #getLatencyPerPartition(String, String, Codec, Function, Function)} instead. 209 */ 210 @Deprecated 211 default <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, 212 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 213 return getLatencyPerPartition(name, group, NO_CODEC, timestampExtractor, keyExtractor); 214 } 215 216 /** 217 * Returns the latency between consumer {@code group} and producers for a Log. 218 * 219 * @since 10.1 220 * @deprecated 10.2 use {@link #getLatency(String, String, Codec, Function, Function)} instead. 221 */ 222 @Deprecated 223 default <M extends Externalizable> Latency getLatency(String name, String group, 224 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 225 return getLatency(name, group, NO_CODEC, timestampExtractor, keyExtractor); 226 } 227 228 /** 229 * Returns all the Log names. 230 */ 231 List<String> listAll(); 232 233 /** 234 * List the consumer groups for a Log.<br/> 235 * Note that for Kafka it returns only consumers that use the subscribe API. 236 */ 237 List<String> listConsumerGroups(String name); 238 239 @Override 240 void close(); 241}