001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.internals; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.time.Duration; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.Objects; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.function.Function; 034 035import org.nuxeo.lib.stream.StreamRuntimeException; 036import org.nuxeo.lib.stream.codec.Codec; 037import org.nuxeo.lib.stream.log.Latency; 038import org.nuxeo.lib.stream.log.LogAppender; 039import org.nuxeo.lib.stream.log.LogLag; 040import org.nuxeo.lib.stream.log.LogManager; 041import org.nuxeo.lib.stream.log.LogOffset; 042import org.nuxeo.lib.stream.log.LogPartition; 043import org.nuxeo.lib.stream.log.LogRecord; 044import org.nuxeo.lib.stream.log.LogTailer; 045import org.nuxeo.lib.stream.log.Name; 046import org.nuxeo.lib.stream.log.RebalanceListener; 047 048public abstract class AbstractLogManager implements LogManager { 049 protected static final Name ADMIN_GROUP = Name.of("admin", "tailer"); 050 051 protected final Map<Name, CloseableLogAppender> appenders = new ConcurrentHashMap<>(); 052 053 protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap<>(); 054 055 // this define a concurrent set of tailers 056 protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap<>()); 057 058 protected abstract void create(Name name, int size); 059 060 protected abstract int getSize(Name name); 061 062 protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(Name name, Codec<M> codec); 063 064 protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, 065 Name group, Codec<M> codec); 066 067 protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(Name group, Collection<Name> names, 068 RebalanceListener listener, Codec<M> codec); 069 070 public abstract List<LogLag> getLagPerPartition(Name name, Name group); 071 072 @Override 073 public synchronized boolean createIfNotExists(Name name, int size) { 074 if (!exists(name)) { 075 create(name, size); 076 return true; 077 } 078 return false; 079 } 080 081 @Override 082 public boolean delete(Name name) { 083 return false; 084 } 085 086 @Override 087 public int size(Name name) { 088 if (appenders.containsKey(name)) { 089 return appenders.get(name).size(); 090 } 091 return getSize(name); 092 } 093 094 @Override 095 public <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions, 096 Codec<M> codec) { 097 Objects.requireNonNull(codec); 098 partitions.forEach(partition -> checkInvalidAssignment(group, partition)); 099 Codec<M> tailerCodec = NO_CODEC.equals(codec) ? guessCodec(partitions) : codec; 100 partitions.forEach(partition -> checkInvalidCodec(partition, tailerCodec)); 101 LogTailer<M> ret = doCreateTailer(partitions, group, tailerCodec); 102 cleanTailers(); 103 partitions.forEach(partition -> tailersAssignments.put(new LogPartitionGroup(group, partition), ret)); 104 tailers.add(ret); 105 return ret; 106 } 107 108 @SuppressWarnings("unchecked") 109 protected <M extends Externalizable> Codec<M> guessCodec(Collection<LogPartition> partitions) { 110 for (LogPartition partition : partitions) { 111 if (appenders.containsKey(partition.name())) { 112 return (Codec<M>) getAppender(partition.name()).getCodec(); 113 } 114 } 115 return NO_CODEC; 116 } 117 118 @Override 119 public boolean supportSubscribe() { 120 return false; 121 } 122 123 @Override 124 public <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names, 125 RebalanceListener listener, Codec<M> codec) { 126 Objects.requireNonNull(codec); 127 LogTailer<M> ret = doSubscribe(group, names, listener, codec); 128 cleanTailers(); 129 tailers.add(ret); 130 return ret; 131 } 132 133 protected void checkInvalidAssignment(Name group, LogPartition partition) { 134 LogPartitionGroup key = new LogPartitionGroup(group, partition); 135 LogTailer<?> ret = tailersAssignments.get(key); 136 if (ret != null && !ret.closed()) { 137 throw new IllegalArgumentException( 138 "Tailer for this partition already created: " + partition + ", group: " + group); 139 } 140 if (!exists(partition.name())) { 141 throw new IllegalArgumentException("Tailer with unknown Log name: " + partition.name()); 142 } 143 } 144 145 protected <M extends Externalizable> void checkInvalidCodec(LogPartition partition, Codec<M> codec) { 146 if (appenders.containsKey(partition.name())) { 147 getAppender(partition.name(), codec); 148 } 149 } 150 151 @SuppressWarnings("unchecked") 152 @Override 153 public <M extends Externalizable> LogAppender<M> getAppender(Name name, Codec<M> codec) { 154 LogAppender<M> ret = appenders.computeIfAbsent(name, n -> { 155 if (exists(n)) { 156 return createAppender(n, codec); 157 } 158 throw new IllegalArgumentException("Unknown Log name: " + n); 159 }); 160 if (NO_CODEC.equals(codec) || sameCodec(ret.getCodec(), codec)) { 161 return ret; 162 } 163 throw new IllegalArgumentException(String.format( 164 "The appender for Log %s exists and expecting codec: %s, cannot use a different codec: %s", name, 165 ret.getCodec(), codec)); 166 } 167 168 protected <M extends Externalizable> boolean sameCodec(Codec<M> codec1, Codec<M> codec2) { 169 return codec1 == codec2 170 || !NO_CODEC.equals(codec1) && !NO_CODEC.equals(codec2) && codec1.getClass().isInstance(codec2); 171 } 172 173 @Override 174 public <M extends Externalizable> List<Latency> getLatencyPerPartition(Name name, Name group, Codec<M> codec, 175 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 176 long now = System.currentTimeMillis(); 177 List<LogLag> lags = getLagPerPartition(name, group); 178 List<Latency> ret = new ArrayList<>(lags.size()); 179 int partition = 0; 180 for (LogLag lag : lags) { 181 if (lag.upper() == 0 || lag.lower() == 0 || lag.lag() == 0) { 182 // empty partition or the group has not consumed any message or there is no lag 183 ret.add(new Latency(0, now, lag, null)); 184 partition++; 185 continue; 186 } 187 // the committed offset point to the next record to process, here we want the last committed offset 188 // which is the previous one 189 LogOffset offset = new LogOffsetImpl(name, partition, lag.lowerOffset() - 1); 190 try (LogTailer<M> tailer = createTailer(ADMIN_GROUP, offset.partition(), codec)) { 191 tailer.seek(offset); 192 LogRecord<M> record = tailer.read(Duration.ofSeconds(1)); 193 if (record == null) { 194 // the beginning of the partition is not necessary offset 0 after retention policy is applied 195 ret.add(new Latency(0, now, lag, null)); 196 } else { 197 long timestamp = timestampExtractor.apply(record.message()); 198 String key = keyExtractor.apply(record.message()); 199 ret.add(new Latency(timestamp, now, lag, key)); 200 } 201 } catch (ClassCastException e) { 202 throw new IllegalStateException("Unexpected record type" + e.getMessage()); 203 } catch (InterruptedException e) { 204 Thread.currentThread().interrupt(); 205 throw new StreamRuntimeException(e); 206 } 207 partition++; 208 } 209 return ret; 210 } 211 212 protected void cleanTailers() { 213 tailers.removeIf(LogTailer::closed); 214 tailersAssignments.values().removeIf(LogTailer::closed); 215 } 216 217 @Override 218 public void close() { 219 appenders.values().stream().filter(Objects::nonNull).forEach(CloseableLogAppender::close); 220 appenders.clear(); 221 tailers.stream().filter(Objects::nonNull).forEach(LogTailer::close); 222 tailers.clear(); 223 tailersAssignments.clear(); 224 } 225 226}