001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.internals; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.time.Duration; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.Objects; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.function.Function; 034 035import org.nuxeo.lib.stream.StreamRuntimeException; 036import org.nuxeo.lib.stream.codec.Codec; 037import org.nuxeo.lib.stream.log.Latency; 038import org.nuxeo.lib.stream.log.LogAppender; 039import org.nuxeo.lib.stream.log.LogLag; 040import org.nuxeo.lib.stream.log.LogManager; 041import org.nuxeo.lib.stream.log.LogOffset; 042import org.nuxeo.lib.stream.log.LogPartition; 043import org.nuxeo.lib.stream.log.LogRecord; 044import org.nuxeo.lib.stream.log.LogTailer; 045import org.nuxeo.lib.stream.log.RebalanceListener; 046 047public abstract class AbstractLogManager implements LogManager { 048 protected final Map<String, CloseableLogAppender> appenders = new ConcurrentHashMap<>(); 049 050 protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap<>(); 051 052 // this define a concurrent set of tailers 053 protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap<LogTailer, Boolean>()); 054 055 protected abstract void create(String name, int size); 056 057 protected abstract int getSize(String name); 058 059 protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(String name, Codec<M> codec); 060 061 protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, 062 String group, Codec<M> codec); 063 064 protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, 065 RebalanceListener listener, Codec<M> codec); 066 067 @Override 068 public synchronized boolean createIfNotExists(String name, int size) { 069 if (!exists(name)) { 070 create(name, size); 071 return true; 072 } 073 return false; 074 } 075 076 @Override 077 public boolean delete(String name) { 078 return false; 079 } 080 081 @Override 082 public int size(String name) { 083 if (appenders.containsKey(name)) { 084 return appenders.get(name).size(); 085 } 086 return getSize(name); 087 } 088 089 @Override 090 public <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions, 091 Codec<M> codec) { 092 Objects.requireNonNull(codec); 093 partitions.forEach(partition -> checkInvalidAssignment(group, partition)); 094 Codec<M> tailerCodec = NO_CODEC.equals(codec) ? guessCodec(partitions) : codec; 095 partitions.forEach(partition -> checkInvalidCodec(partition, tailerCodec)); 096 LogTailer<M> ret = doCreateTailer(partitions, group, tailerCodec); 097 partitions.forEach(partition -> tailersAssignments.put(new LogPartitionGroup(group, partition), ret)); 098 tailers.add(ret); 099 return ret; 100 } 101 102 @SuppressWarnings("unchecked") 103 protected <M extends Externalizable> Codec<M> guessCodec(Collection<LogPartition> partitions) { 104 for (LogPartition partition : partitions) { 105 if (appenders.containsKey(partition.name())) { 106 return (Codec<M>) getAppender(partition.name()).getCodec(); 107 } 108 } 109 return NO_CODEC; 110 } 111 112 @Override 113 public boolean supportSubscribe() { 114 return false; 115 } 116 117 @Override 118 public <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 119 RebalanceListener listener, Codec<M> codec) { 120 Objects.requireNonNull(codec); 121 LogTailer<M> ret = doSubscribe(group, names, listener, codec); 122 tailers.add(ret); 123 return ret; 124 } 125 126 protected void checkInvalidAssignment(String group, LogPartition partition) { 127 LogPartitionGroup key = new LogPartitionGroup(group, partition); 128 LogTailer ret = tailersAssignments.get(key); 129 if (ret != null && !ret.closed()) { 130 throw new IllegalArgumentException( 131 "Tailer for this partition already created: " + partition + ", group: " + group); 132 } 133 if (!exists(partition.name())) { 134 throw new IllegalArgumentException("Tailer with unknown Log name: " + partition.name()); 135 } 136 } 137 138 @SuppressWarnings("unchecked") 139 protected void checkInvalidCodec(LogPartition partition, Codec codec) { 140 if (appenders.containsKey(partition.name())) { 141 getAppender(partition.name(), codec); 142 } 143 } 144 145 @SuppressWarnings("unchecked") 146 @Override 147 public synchronized <M extends Externalizable> LogAppender<M> getAppender(String name, Codec<M> codec) { 148 LogAppender<M> ret = (LogAppender<M>) appenders.computeIfAbsent(name, n -> { 149 if (exists(n)) { 150 return createAppender(n, codec); 151 } 152 throw new IllegalArgumentException("Unknown Log name: " + n); 153 }); 154 if (NO_CODEC.equals(codec) || sameCodec(ret.getCodec(), codec)) { 155 return ret; 156 } 157 throw new IllegalArgumentException(String.format( 158 "The appender for Log %s exists and expecting codec: %s, cannot use a different codec: %s", name, 159 ret.getCodec(), codec)); 160 } 161 162 protected boolean sameCodec(Codec codec1, Codec codec2) { 163 return codec1 == codec2 164 || !NO_CODEC.equals(codec1) && !NO_CODEC.equals(codec2) && codec1.getClass().isInstance(codec2); 165 } 166 167 @Override 168 public <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, Codec<M> codec, 169 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 170 long now = System.currentTimeMillis(); 171 List<LogLag> lags = getLagPerPartition(name, group); 172 List<Latency> ret = new ArrayList<>(lags.size()); 173 int partition = 0; 174 for (LogLag lag : lags) { 175 if (lag.upper() == 0 || lag.lower() == 0) { 176 // empty partition or the group has not consumed any message 177 ret.add(new Latency(0, now, lag, null)); 178 partition++; 179 continue; 180 } 181 // the committed offset point to the next record to process, here we want the last committed offset 182 // which is the previous one 183 LogOffset offset = new LogOffsetImpl(name, partition, lag.lowerOffset() - 1); 184 try (LogTailer<M> tailer = createTailer("tools", offset.partition(), codec)) { 185 tailer.seek(offset); 186 LogRecord<M> record = tailer.read(Duration.ofSeconds(1)); 187 if (record == null) { 188 throw new IllegalStateException("Unable to read " + offset + " lag: " + lag); 189 } else { 190 try { 191 long timestamp = timestampExtractor.apply(record.message()); 192 String key = keyExtractor.apply(record.message()); 193 ret.add(new Latency(timestamp, now, lag, key)); 194 } catch (ClassCastException e) { 195 throw new IllegalStateException("Unexpected record type" + e.getMessage()); 196 } 197 } 198 } catch (InterruptedException e) { 199 Thread.currentThread().interrupt(); 200 throw new StreamRuntimeException(e); 201 } 202 partition++; 203 } 204 return ret; 205 } 206 207 @Override 208 public void close() { 209 appenders.values().stream().filter(Objects::nonNull).forEach(CloseableLogAppender::close); 210 appenders.clear(); 211 tailers.stream().filter(Objects::nonNull).forEach(LogTailer::close); 212 tailers.clear(); 213 tailersAssignments.clear(); 214 } 215 216}