001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import java.io.Externalizable;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.function.Function;
028import java.util.stream.Collectors;
029
030import org.nuxeo.lib.stream.codec.Codec;
031import org.nuxeo.lib.stream.log.chronicle.ChronicleLogConfig;
032import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
033import org.nuxeo.lib.stream.log.kafka.KafkaLogConfig;
034import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
035
036/**
037 * @since 11.1
038 */
039public class UnifiedLogManager implements LogManager {
040
041    protected final List<LogConfig> configs;
042
043    protected LogManager cqManager;
044
045    protected LogManager kafkaManager;
046
047    protected LogManager defaultManager;
048
049    protected LogConfig defaultConfig;
050
051    protected Map<LogConfig, LogManager> managers = new HashMap<>();
052
053    public UnifiedLogManager(List<LogConfig> configs) {
054        if (configs == null || configs.isEmpty()) {
055            throw new IllegalArgumentException("No LogConfig provided");
056        }
057        this.configs = configs;
058        createCQLogManager();
059        createKafkaLogManager();
060        findDefaultLogManger();
061    }
062
063    protected void createCQLogManager() {
064        List<ChronicleLogConfig> cqConfigs = configs.stream()
065                                                    .filter(config -> config instanceof ChronicleLogConfig)
066                                                    .map(config -> (ChronicleLogConfig) config)
067                                                    .collect(Collectors.toList());
068        if (!cqConfigs.isEmpty()) {
069            cqManager = new ChronicleLogManager(cqConfigs);
070            cqConfigs.forEach(config -> managers.put(config, cqManager));
071        }
072    }
073
074    protected void createKafkaLogManager() {
075        List<KafkaLogConfig> kafkaConfigs = configs.stream()
076                                                   .filter(config -> config instanceof KafkaLogConfig)
077                                                   .map(config -> (KafkaLogConfig) config)
078                                                   .collect(Collectors.toList());
079        if (!kafkaConfigs.isEmpty()) {
080            kafkaManager = new KafkaLogManager(kafkaConfigs);
081            kafkaConfigs.forEach(config -> managers.put(config, kafkaManager));
082        }
083    }
084
085    protected void findDefaultLogManger() {
086        List<LogConfig> defaultConfigs = configs.stream().filter(LogConfig::isDefault).collect(Collectors.toList());
087        // use the last default config
088        if (defaultConfigs.isEmpty()) {
089            defaultConfig = configs.get(configs.size() - 1);
090        } else {
091            defaultConfig = defaultConfigs.get(defaultConfigs.size() - 1);
092        }
093        if (defaultConfig instanceof ChronicleLogConfig) {
094            defaultManager = cqManager;
095        } else {
096            defaultManager = kafkaManager;
097        }
098    }
099
100    protected LogManager getManager(Name name) {
101        return managers.get(configs.stream()
102                                   .filter(config -> config.match(name))
103                                   .findFirst()
104                                   .orElse(defaultConfig));
105    }
106
107    protected LogManager getManager(Name name, Name group) {
108        return managers.get(
109                configs.stream()
110                       .filter(config -> config.match(name, group))
111                       .findFirst()
112                       .orElse(defaultConfig));
113    }
114
115    @Override
116    public boolean exists(Name name) {
117        return getManager(name).exists(name);
118    }
119
120    @Override
121    public boolean createIfNotExists(Name name, int size) {
122        return getManager(name).createIfNotExists(name, size);
123    }
124
125    @Override
126    public boolean delete(Name name) {
127        return getManager(name).delete(name);
128    }
129
130    @Override
131    public int size(Name name) {
132        return getManager(name).size(name);
133    }
134
135    @Override
136    public <M extends Externalizable> LogAppender<M> getAppender(Name name, Codec<M> codec) {
137        return getManager(name).getAppender(name, codec);
138    }
139
140    @Override
141    public <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions,
142            Codec<M> codec) {
143        if (partitions.isEmpty()) {
144            return defaultManager.createTailer(group, partitions, codec);
145        }
146        Name name = partitions.iterator().next().name();
147        return getManager(name, group).createTailer(group, partitions, codec);
148    }
149
150    @Override
151    public boolean supportSubscribe() {
152        return defaultManager.supportSubscribe();
153    }
154
155    @Override
156    public <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names,
157            RebalanceListener listener, Codec<M> codec) {
158        Name name = names.iterator().next();
159        return getManager(name, group).subscribe(group, names, listener, codec);
160    }
161
162    @Override
163    public List<LogLag> getLagPerPartition(Name name, Name group) {
164        return getManager(name, group).getLagPerPartition(name, group);
165    }
166
167    @Override
168    public <M extends Externalizable> List<Latency> getLatencyPerPartition(Name name, Name group, Codec<M> codec,
169            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
170        return getManager(name, group).getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor);
171    }
172
173    @Override
174    public List<Name> listAllNames() {
175        List<Name> names = new ArrayList<>();
176        if (kafkaManager != null) {
177            names.addAll(kafkaManager.listAllNames());
178        }
179        if (cqManager != null) {
180            names.addAll(cqManager.listAllNames());
181        }
182        return names;
183    }
184
185    @Override
186    public List<Name> listConsumerGroups(Name name) {
187        return getManager(name).listConsumerGroups(name);
188    }
189
190    @Override
191    public void close() {
192        if (kafkaManager != null) {
193            kafkaManager.close();
194        }
195        if (cqManager != null) {
196            cqManager.close();
197        }
198    }
199}