001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log; 020 021import static java.lang.Math.max; 022import static java.lang.Math.min; 023 024import java.io.Externalizable; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.List; 028import java.util.stream.Collectors; 029import java.util.stream.IntStream; 030 031/** 032 * Manage Log and give access to Appenders and Tailers. Closing the LogManager will also close all its appenders and 033 * tailers. 034 * 035 * @since 9.3 036 */ 037public interface LogManager extends AutoCloseable { 038 039 /** 040 * Returns {@code true} if a Log with this {@code name} exists. 041 */ 042 boolean exists(String name); 043 044 /** 045 * Creates a new Log with {@code size} partitions if the Log does not exists. Returns true it the Log has been 046 * created. 047 */ 048 boolean createIfNotExists(String name, int size); 049 050 /** 051 * Try to delete a Log. Returns true if successfully deleted, might not be possible depending on the implementation. 052 */ 053 boolean delete(String name); 054 055 /** 056 * Get an appender for the Log named {@code name}. An appender is thread safe. 057 */ 058 <M extends Externalizable> LogAppender<M> getAppender(String name); 059 060 /** 061 * Create a tailer for a consumer {@code group} and assign multiple {@code partitions}. Note that {@code partitions} 062 * can be from different Logs. A tailer is NOT thread safe. 063 */ 064 <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions); 065 066 /** 067 * Create a tailer for a consumer {@code group} and assign a single {@code partition}. A tailer is NOT thread safe. 068 */ 069 default <M extends Externalizable> LogTailer<M> createTailer(String group, LogPartition partition) { 070 return createTailer(group, Collections.singletonList(partition)); 071 } 072 073 /** 074 * Create a tailer for a consumer {@code group} and assign all {@code partitions} of the Log. A tailer is NOT thread 075 * safe. 076 */ 077 default <M extends Externalizable> LogTailer<M> createTailer(String group, String name) { 078 int size = getAppender(name).size(); 079 return createTailer(group, 080 IntStream.range(0, size).boxed().map(partition -> new LogPartition(name, partition)).collect( 081 Collectors.toList())); 082 } 083 084 /** 085 * Returns {@code true} if the Log {@link #subscribe} method is supported. 086 */ 087 boolean supportSubscribe(); 088 089 /** 090 * Create a tailer for a consumer {@code group} and subscribe to multiple Logs. The partitions assignment is done 091 * dynamically depending on the number of subscribers. The partitions can change during tailers life, this is called 092 * a rebalancing. A listener can be used to be notified on assignment changes. 093 * <p/> 094 * A tailer is NOT thread safe. 095 * <p/> 096 * You should not mix {@link #createTailer} and {@code subscribe} usage using the same {@code group}. 097 */ 098 <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 099 RebalanceListener listener); 100 101 /** 102 * Returns the lag between consumer {@code group} and the producers for each partition. The result list is ordered, 103 * for instance index 0 is lag for partition 0. 104 */ 105 List<LogLag> getLagPerPartition(String name, String group); 106 107 /** 108 * Returns the lag between consumer {@code group} and producers for a Log. 109 */ 110 default LogLag getLag(String name, String group) { 111 final long[] end = { 0 }; 112 final long[] pos = { Long.MAX_VALUE }; 113 final long[] lag = { 0 }; 114 final long[] endMessages = { 0 }; 115 getLagPerPartition(name, group).forEach(item -> { 116 if (item.lowerOffset() > 0) { 117 pos[0] = min(pos[0], item.lowerOffset()); 118 } 119 end[0] = max(end[0], item.upperOffset()); 120 endMessages[0] += item.upper(); 121 lag[0] += item.lag(); 122 }); 123 return new LogLag(pos[0], end[0], lag[0], endMessages[0]); 124 } 125 126 /** 127 * Returns all the Log names. 128 */ 129 List<String> listAll(); 130 131 /** 132 * List the consumer groups for a Log. 133 */ 134 List<String> listConsumerGroups(String name); 135 136 @Override 137 void close(); 138}