001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.internals;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.time.Duration;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import java.util.Objects;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.function.Function;
034
035import org.nuxeo.lib.stream.StreamRuntimeException;
036import org.nuxeo.lib.stream.codec.Codec;
037import org.nuxeo.lib.stream.log.Latency;
038import org.nuxeo.lib.stream.log.LogAppender;
039import org.nuxeo.lib.stream.log.LogLag;
040import org.nuxeo.lib.stream.log.LogManager;
041import org.nuxeo.lib.stream.log.LogOffset;
042import org.nuxeo.lib.stream.log.LogPartition;
043import org.nuxeo.lib.stream.log.LogRecord;
044import org.nuxeo.lib.stream.log.LogTailer;
045import org.nuxeo.lib.stream.log.RebalanceListener;
046
047public abstract class AbstractLogManager implements LogManager {
048    protected final Map<String, CloseableLogAppender> appenders = new ConcurrentHashMap<>();
049
050    protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap<>();
051
052    // this define a concurrent set of tailers
053    protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap<LogTailer, Boolean>());
054
055    protected abstract void create(String name, int size);
056
057    protected abstract int getSize(String name);
058
059    protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(String name, Codec<M> codec);
060
061    protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions,
062            String group, Codec<M> codec);
063
064    protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names,
065            RebalanceListener listener, Codec<M> codec);
066
067    @Override
068    public synchronized boolean createIfNotExists(String name, int size) {
069        if (!exists(name)) {
070            create(name, size);
071            return true;
072        }
073        return false;
074    }
075
076    @Override
077    public boolean delete(String name) {
078        return false;
079    }
080
081    @Override
082    public int size(String name) {
083        if (appenders.containsKey(name)) {
084            return appenders.get(name).size();
085        }
086        return getSize(name);
087    }
088
089    @Override
090    public <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions,
091            Codec<M> codec) {
092        Objects.requireNonNull(codec);
093        partitions.forEach(partition -> checkInvalidAssignment(group, partition));
094        Codec<M> tailerCodec = NO_CODEC.equals(codec) ? guessCodec(partitions) : codec;
095        partitions.forEach(partition -> checkInvalidCodec(partition, tailerCodec));
096        LogTailer<M> ret = doCreateTailer(partitions, group, tailerCodec);
097        partitions.forEach(partition -> tailersAssignments.put(new LogPartitionGroup(group, partition), ret));
098        tailers.add(ret);
099        return ret;
100    }
101
102    @SuppressWarnings("unchecked")
103    protected <M extends Externalizable> Codec<M> guessCodec(Collection<LogPartition> partitions) {
104        for (LogPartition partition : partitions) {
105            if (appenders.containsKey(partition.name())) {
106                return (Codec<M>) getAppender(partition.name()).getCodec();
107            }
108        }
109        return NO_CODEC;
110    }
111
112    @Override
113    public boolean supportSubscribe() {
114        return false;
115    }
116
117    @Override
118    public <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
119            RebalanceListener listener, Codec<M> codec) {
120        Objects.requireNonNull(codec);
121        LogTailer<M> ret = doSubscribe(group, names, listener, codec);
122        tailers.add(ret);
123        return ret;
124    }
125
126    protected void checkInvalidAssignment(String group, LogPartition partition) {
127        LogPartitionGroup key = new LogPartitionGroup(group, partition);
128        LogTailer ret = tailersAssignments.get(key);
129        if (ret != null && !ret.closed()) {
130            throw new IllegalArgumentException(
131                    "Tailer for this partition already created: " + partition + ", group: " + group);
132        }
133        if (!exists(partition.name())) {
134            throw new IllegalArgumentException("Tailer with unknown Log name: " + partition.name());
135        }
136    }
137
138    @SuppressWarnings("unchecked")
139    protected void checkInvalidCodec(LogPartition partition, Codec codec) {
140        if (appenders.containsKey(partition.name())) {
141            getAppender(partition.name(), codec);
142        }
143    }
144
145    @SuppressWarnings("unchecked")
146    @Override
147    public synchronized <M extends Externalizable> LogAppender<M> getAppender(String name, Codec<M> codec) {
148        LogAppender<M> ret = (LogAppender<M>) appenders.computeIfAbsent(name, n -> {
149            if (exists(n)) {
150                return createAppender(n, codec);
151            }
152            throw new IllegalArgumentException("Unknown Log name: " + n);
153        });
154        if (NO_CODEC.equals(codec) || sameCodec(ret.getCodec(), codec)) {
155            return ret;
156        }
157        throw new IllegalArgumentException(String.format(
158                "The appender for Log %s exists and expecting codec: %s, cannot use a different codec: %s", name,
159                ret.getCodec(), codec));
160    }
161
162    protected boolean sameCodec(Codec codec1, Codec codec2) {
163        return codec1 == codec2
164                || !NO_CODEC.equals(codec1) && !NO_CODEC.equals(codec2) && codec1.getClass().isInstance(codec2);
165    }
166
167    @Override
168    public <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group, Codec<M> codec,
169            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
170        long now = System.currentTimeMillis();
171        List<LogLag> lags = getLagPerPartition(name, group);
172        List<Latency> ret = new ArrayList<>(lags.size());
173        int partition = 0;
174        for (LogLag lag : lags) {
175            if (lag.upper() == 0 || lag.lower() == 0) {
176                // empty partition or the group has not consumed any message
177                ret.add(new Latency(0, now, lag, null));
178                partition++;
179                continue;
180            }
181            // the committed offset point to the next record to process, here we want the last committed offset
182            // which is the previous one
183            LogOffset offset = new LogOffsetImpl(name, partition, lag.lowerOffset() - 1);
184            try (LogTailer<M> tailer = createTailer("tools", offset.partition(), codec)) {
185                tailer.seek(offset);
186                LogRecord<M> record = tailer.read(Duration.ofSeconds(1));
187                if (record == null) {
188                    throw new IllegalStateException("Unable to read " + offset + " lag: " + lag);
189                } else {
190                    try {
191                        long timestamp = timestampExtractor.apply(record.message());
192                        String key = keyExtractor.apply(record.message());
193                        ret.add(new Latency(timestamp, now, lag, key));
194                    } catch (ClassCastException e) {
195                        throw new IllegalStateException("Unexpected record type" + e.getMessage());
196                    }
197                }
198            } catch (InterruptedException e) {
199                Thread.currentThread().interrupt();
200                throw new StreamRuntimeException(e);
201            }
202            partition++;
203        }
204        return ret;
205    }
206
207    @Override
208    public void close() {
209        appenders.values().stream().filter(Objects::nonNull).forEach(CloseableLogAppender::close);
210        appenders.clear();
211        tailers.stream().filter(Objects::nonNull).forEach(LogTailer::close);
212        tailers.clear();
213        tailersAssignments.clear();
214    }
215
216}