001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.internals;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.time.Duration;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import java.util.Objects;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.function.Function;
034
035import org.nuxeo.lib.stream.StreamRuntimeException;
036import org.nuxeo.lib.stream.codec.Codec;
037import org.nuxeo.lib.stream.log.Latency;
038import org.nuxeo.lib.stream.log.LogAppender;
039import org.nuxeo.lib.stream.log.LogLag;
040import org.nuxeo.lib.stream.log.LogManager;
041import org.nuxeo.lib.stream.log.LogOffset;
042import org.nuxeo.lib.stream.log.LogPartition;
043import org.nuxeo.lib.stream.log.LogRecord;
044import org.nuxeo.lib.stream.log.LogTailer;
045import org.nuxeo.lib.stream.log.Name;
046import org.nuxeo.lib.stream.log.RebalanceListener;
047
048public abstract class AbstractLogManager implements LogManager {
049    protected static final Name ADMIN_GROUP = Name.of("admin", "tailer");
050
051    protected final Map<Name, CloseableLogAppender> appenders = new ConcurrentHashMap<>();
052
053    protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap<>();
054
055    // this define a concurrent set of tailers
056    protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap<>());
057
058    protected abstract void create(Name name, int size);
059
060    protected abstract int getSize(Name name);
061
062    protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(Name name, Codec<M> codec);
063
064    protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions,
065            Name group, Codec<M> codec);
066
067    protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(Name group, Collection<Name> names,
068            RebalanceListener listener, Codec<M> codec);
069
070    public abstract List<LogLag> getLagPerPartition(Name name, Name group);
071
072    @Override
073    public synchronized boolean createIfNotExists(Name name, int size) {
074        if (!exists(name)) {
075            create(name, size);
076            return true;
077        }
078        return false;
079    }
080
081    @Override
082    public boolean delete(Name name) {
083        return false;
084    }
085
086    @Override
087    public int size(Name name) {
088        if (appenders.containsKey(name)) {
089            return appenders.get(name).size();
090        }
091        return getSize(name);
092    }
093
094    @Override
095    public <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions,
096            Codec<M> codec) {
097        Objects.requireNonNull(codec);
098        partitions.forEach(partition -> checkInvalidAssignment(group, partition));
099        Codec<M> tailerCodec = NO_CODEC.equals(codec) ? guessCodec(partitions) : codec;
100        partitions.forEach(partition -> checkInvalidCodec(partition, tailerCodec));
101        LogTailer<M> ret = doCreateTailer(partitions, group, tailerCodec);
102        cleanTailers();
103        partitions.forEach(partition -> tailersAssignments.put(new LogPartitionGroup(group, partition), ret));
104        tailers.add(ret);
105        return ret;
106    }
107
108    @SuppressWarnings("unchecked")
109    protected <M extends Externalizable> Codec<M> guessCodec(Collection<LogPartition> partitions) {
110        for (LogPartition partition : partitions) {
111            if (appenders.containsKey(partition.name())) {
112                return (Codec<M>) getAppender(partition.name()).getCodec();
113            }
114        }
115        return NO_CODEC;
116    }
117
118    @Override
119    public boolean supportSubscribe() {
120        return false;
121    }
122
123    @Override
124    public <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names,
125            RebalanceListener listener, Codec<M> codec) {
126        Objects.requireNonNull(codec);
127        LogTailer<M> ret = doSubscribe(group, names, listener, codec);
128        cleanTailers();
129        tailers.add(ret);
130        return ret;
131    }
132
133    protected void checkInvalidAssignment(Name group, LogPartition partition) {
134        LogPartitionGroup key = new LogPartitionGroup(group, partition);
135        LogTailer<?> ret = tailersAssignments.get(key);
136        if (ret != null && !ret.closed()) {
137            throw new IllegalArgumentException(
138                    "Tailer for this partition already created: " + partition + ", group: " + group);
139        }
140        if (!exists(partition.name())) {
141            throw new IllegalArgumentException("Tailer with unknown Log name: " + partition.name());
142        }
143    }
144
145    protected <M extends Externalizable> void checkInvalidCodec(LogPartition partition, Codec<M> codec) {
146        if (appenders.containsKey(partition.name())) {
147            getAppender(partition.name(), codec);
148        }
149    }
150
151    @SuppressWarnings("unchecked")
152    @Override
153    public <M extends Externalizable> LogAppender<M> getAppender(Name name, Codec<M> codec) {
154        LogAppender<M> ret = appenders.computeIfAbsent(name, n -> {
155            if (exists(n)) {
156                return createAppender(n, codec);
157            }
158            throw new IllegalArgumentException("Unknown Log name: " + n);
159        });
160        if (NO_CODEC.equals(codec) || sameCodec(ret.getCodec(), codec)) {
161            return ret;
162        }
163        throw new IllegalArgumentException(String.format(
164                "The appender for Log %s exists and expecting codec: %s, cannot use a different codec: %s", name,
165                ret.getCodec(), codec));
166    }
167
168    protected <M extends Externalizable> boolean sameCodec(Codec<M> codec1, Codec<M> codec2) {
169        return codec1 == codec2
170                || !NO_CODEC.equals(codec1) && !NO_CODEC.equals(codec2) && codec1.getClass().isInstance(codec2);
171    }
172
173    @Override
174    public <M extends Externalizable> List<Latency> getLatencyPerPartition(Name name, Name group, Codec<M> codec,
175            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
176        long now = System.currentTimeMillis();
177        List<LogLag> lags = getLagPerPartition(name, group);
178        List<Latency> ret = new ArrayList<>(lags.size());
179        int partition = 0;
180        for (LogLag lag : lags) {
181            if (lag.upper() == 0 || lag.lower() == 0 || lag.lag() == 0) {
182                // empty partition or the group has not consumed any message or there is no lag
183                ret.add(new Latency(0, now, lag, null));
184                partition++;
185                continue;
186            }
187            // the committed offset point to the next record to process, here we want the last committed offset
188            // which is the previous one
189            LogOffset offset = new LogOffsetImpl(name, partition, lag.lowerOffset() - 1);
190            try (LogTailer<M> tailer = createTailer(ADMIN_GROUP, offset.partition(), codec)) {
191                tailer.seek(offset);
192                LogRecord<M> record = tailer.read(Duration.ofSeconds(1));
193                if (record == null) {
194                    // the beginning of the partition is not necessary offset 0 after retention policy is applied
195                    ret.add(new Latency(0, now, lag, null));
196                } else {
197                    long timestamp = timestampExtractor.apply(record.message());
198                    String key = keyExtractor.apply(record.message());
199                    ret.add(new Latency(timestamp, now, lag, key));
200                }
201            } catch (ClassCastException e) {
202                throw new IllegalStateException("Unexpected record type" + e.getMessage());
203            } catch (InterruptedException e) {
204                Thread.currentThread().interrupt();
205                throw new StreamRuntimeException(e);
206            }
207            partition++;
208        }
209        return ret;
210    }
211
212    protected void cleanTailers() {
213        tailers.removeIf(LogTailer::closed);
214        tailersAssignments.values().removeIf(LogTailer::closed);
215    }
216
217    @Override
218    public void close() {
219        appenders.values().stream().filter(Objects::nonNull).forEach(CloseableLogAppender::close);
220        appenders.clear();
221        tailers.stream().filter(Objects::nonNull).forEach(LogTailer::close);
222        tailers.clear();
223        tailersAssignments.clear();
224    }
225
226}