001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.internals; 020 021import java.io.Externalizable; 022import java.time.Duration; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.List; 027import java.util.Map; 028import java.util.Objects; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.function.Function; 032 033import org.nuxeo.lib.stream.log.Latency; 034import org.nuxeo.lib.stream.log.LogAppender; 035import org.nuxeo.lib.stream.log.LogLag; 036import org.nuxeo.lib.stream.log.LogManager; 037import org.nuxeo.lib.stream.log.LogOffset; 038import org.nuxeo.lib.stream.log.LogPartition; 039import org.nuxeo.lib.stream.log.LogRecord; 040import org.nuxeo.lib.stream.log.LogTailer; 041import org.nuxeo.lib.stream.log.RebalanceListener; 042 043public abstract class AbstractLogManager implements LogManager { 044 protected final Map<String, CloseableLogAppender> appenders = new ConcurrentHashMap<>(); 045 046 protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap<>(); 047 048 // this define a concurrent set of tailers 049 protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap<LogTailer, Boolean>()); 050 051 protected abstract void create(String name, int size); 052 053 protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(String name); 054 055 protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, 056 String group); 057 058 protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, 059 RebalanceListener listener); 060 061 @Override 062 public synchronized boolean createIfNotExists(String name, int size) { 063 if (!exists(name)) { 064 create(name, size); 065 return true; 066 } 067 return false; 068 } 069 070 @Override 071 public boolean delete(String name) { 072 return false; 073 } 074 075 @Override 076 public <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) { 077 partitions.forEach(partition -> checkInvalidAssignment(group, partition)); 078 LogTailer<M> ret = doCreateTailer(partitions, group); 079 partitions.forEach(partition -> tailersAssignments.put(new LogPartitionGroup(group, partition), ret)); 080 tailers.add(ret); 081 return ret; 082 } 083 084 @Override 085 public boolean supportSubscribe() { 086 return false; 087 } 088 089 @Override 090 public <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 091 RebalanceListener listener) { 092 LogTailer<M> ret = doSubscribe(group, names, listener); 093 tailers.add(ret); 094 return ret; 095 } 096 097 protected void checkInvalidAssignment(String group, LogPartition partition) { 098 LogPartitionGroup key = new LogPartitionGroup(group, partition); 099 LogTailer ret = tailersAssignments.get(key); 100 if (ret != null && !ret.closed()) { 101 throw new IllegalArgumentException( 102 "Tailer for this partition already created: " + partition + ", group: " + group); 103 } 104 if (!exists(partition.name())) { 105 throw new IllegalArgumentException("Tailer with unknown Log name: " + partition.name()); 106 } 107 } 108 109 @SuppressWarnings("unchecked") 110 @Override 111 public synchronized <M extends Externalizable> LogAppender<M> getAppender(String name) { 112 return (LogAppender<M>) appenders.computeIfAbsent(name, n -> { 113 if (exists(n)) { 114 return createAppender(n); 115 } 116 throw new IllegalArgumentException("unknown Log name: " + n); 117 }); 118 } 119 120 @Override 121 public <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, 122 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 123 long now = System.currentTimeMillis(); 124 List<LogLag> lags = getLagPerPartition(name, group); 125 List<Latency> ret = new ArrayList<>(lags.size()); 126 int partition = 0; 127 for (LogLag lag : lags) { 128 if (lag.upper() == 0 || lag.lower() == 0) { 129 // empty partition or the group has not consumed any message 130 ret.add(new Latency(0, now, lag, null)); 131 partition++; 132 continue; 133 } 134 // the committed offset point to the next record to process, here we want the last committed offset 135 // which is the previous one 136 LogOffset offset = new LogOffsetImpl(name, partition, lag.lowerOffset() - 1); 137 try (LogTailer<M> tailer = createTailer("tools", offset.partition())) { 138 tailer.seek(offset); 139 LogRecord<M> record = tailer.read(Duration.ofSeconds(1)); 140 if (record == null) { 141 throw new IllegalStateException("Unable to read " + offset + " lag: " + lag); 142 } else { 143 try { 144 long timestamp = timestampExtractor.apply(record.message()); 145 String key = keyExtractor.apply(record.message()); 146 ret.add(new Latency(timestamp, now, lag, key)); 147 } catch (ClassCastException e) { 148 throw new IllegalStateException("Unexpected record type" + e.getMessage()); 149 } 150 } 151 } catch (InterruptedException e) { 152 Thread.currentThread().interrupt(); 153 throw new RuntimeException(e); 154 } 155 partition++; 156 } 157 return ret; 158 } 159 160 @Override 161 public void close() { 162 appenders.values().stream().filter(Objects::nonNull).forEach(CloseableLogAppender::close); 163 appenders.clear(); 164 tailers.stream().filter(Objects::nonNull).forEach(LogTailer::close); 165 tailers.clear(); 166 tailersAssignments.clear(); 167 } 168 169}