001/* 002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log; 020 021import java.io.Externalizable; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.function.Function; 028import java.util.stream.Collectors; 029 030import org.nuxeo.lib.stream.codec.Codec; 031import org.nuxeo.lib.stream.log.chronicle.ChronicleLogConfig; 032import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager; 033import org.nuxeo.lib.stream.log.kafka.KafkaLogConfig; 034import org.nuxeo.lib.stream.log.kafka.KafkaLogManager; 035 036/** 037 * @since 11.1 038 */ 039public class UnifiedLogManager implements LogManager { 040 041 protected final List<LogConfig> configs; 042 043 protected LogManager cqManager; 044 045 protected LogManager kafkaManager; 046 047 protected LogManager defaultManager; 048 049 protected LogConfig defaultConfig; 050 051 protected Map<LogConfig, LogManager> managers = new HashMap<>(); 052 053 public UnifiedLogManager(List<LogConfig> configs) { 054 if (configs == null || configs.isEmpty()) { 055 throw new IllegalArgumentException("No LogConfig provided"); 056 } 057 this.configs = configs; 058 createCQLogManager(); 059 createKafkaLogManager(); 060 findDefaultLogManger(); 061 } 062 063 protected void createCQLogManager() { 064 List<ChronicleLogConfig> cqConfigs = configs.stream() 065 .filter(config -> config instanceof ChronicleLogConfig) 066 .map(config -> (ChronicleLogConfig) config) 067 .collect(Collectors.toList()); 068 if (!cqConfigs.isEmpty()) { 069 cqManager = new ChronicleLogManager(cqConfigs); 070 cqConfigs.forEach(config -> managers.put(config, cqManager)); 071 } 072 } 073 074 protected void createKafkaLogManager() { 075 List<KafkaLogConfig> kafkaConfigs = configs.stream() 076 .filter(config -> config instanceof KafkaLogConfig) 077 .map(config -> (KafkaLogConfig) config) 078 .collect(Collectors.toList()); 079 if (!kafkaConfigs.isEmpty()) { 080 kafkaManager = new KafkaLogManager(kafkaConfigs); 081 kafkaConfigs.forEach(config -> managers.put(config, kafkaManager)); 082 } 083 } 084 085 protected void findDefaultLogManger() { 086 List<LogConfig> defaultConfigs = configs.stream().filter(LogConfig::isDefault).collect(Collectors.toList()); 087 // use the last default config 088 if (defaultConfigs.isEmpty()) { 089 defaultConfig = configs.get(configs.size() - 1); 090 } else { 091 defaultConfig = defaultConfigs.get(defaultConfigs.size() - 1); 092 } 093 if (defaultConfig instanceof ChronicleLogConfig) { 094 defaultManager = cqManager; 095 } else { 096 defaultManager = kafkaManager; 097 } 098 } 099 100 protected LogManager getManager(Name name) { 101 return managers.get(configs.stream() 102 .filter(config -> config.match(name)) 103 .findFirst() 104 .orElse(defaultConfig)); 105 } 106 107 protected LogManager getManager(Name name, Name group) { 108 return managers.get( 109 configs.stream() 110 .filter(config -> config.match(name, group)) 111 .findFirst() 112 .orElse(defaultConfig)); 113 } 114 115 @Override 116 public boolean exists(Name name) { 117 return getManager(name).exists(name); 118 } 119 120 @Override 121 public boolean createIfNotExists(Name name, int size) { 122 return getManager(name).createIfNotExists(name, size); 123 } 124 125 @Override 126 public boolean delete(Name name) { 127 return getManager(name).delete(name); 128 } 129 130 @Override 131 public int size(Name name) { 132 return getManager(name).size(name); 133 } 134 135 @Override 136 public <M extends Externalizable> LogAppender<M> getAppender(Name name, Codec<M> codec) { 137 return getManager(name).getAppender(name, codec); 138 } 139 140 @Override 141 public <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions, 142 Codec<M> codec) { 143 if (partitions.isEmpty()) { 144 return defaultManager.createTailer(group, partitions, codec); 145 } 146 Name name = partitions.iterator().next().name(); 147 return getManager(name, group).createTailer(group, partitions, codec); 148 } 149 150 @Override 151 public boolean supportSubscribe() { 152 return defaultManager.supportSubscribe(); 153 } 154 155 @Override 156 public <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names, 157 RebalanceListener listener, Codec<M> codec) { 158 Name name = names.iterator().next(); 159 return getManager(name, group).subscribe(group, names, listener, codec); 160 } 161 162 @Override 163 public List<LogLag> getLagPerPartition(Name name, Name group) { 164 return getManager(name, group).getLagPerPartition(name, group); 165 } 166 167 @Override 168 public <M extends Externalizable> List<Latency> getLatencyPerPartition(Name name, Name group, Codec<M> codec, 169 Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) { 170 return getManager(name, group).getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor); 171 } 172 173 @Override 174 public List<Name> listAllNames() { 175 List<Name> names = new ArrayList<>(); 176 if (kafkaManager != null) { 177 names.addAll(kafkaManager.listAllNames()); 178 } 179 if (cqManager != null) { 180 names.addAll(cqManager.listAllNames()); 181 } 182 return names; 183 } 184 185 @Override 186 public List<Name> listConsumerGroups(Name name) { 187 return getManager(name).listConsumerGroups(name); 188 } 189 190 @Override 191 public void close() { 192 if (kafkaManager != null) { 193 kafkaManager.close(); 194 } 195 if (cqManager != null) { 196 cqManager.close(); 197 } 198 } 199}