001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.internals;
020
021import java.io.Externalizable;
022import java.time.Duration;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.List;
027import java.util.Map;
028import java.util.Objects;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.function.Function;
032
033import org.nuxeo.lib.stream.log.Latency;
034import org.nuxeo.lib.stream.log.LogAppender;
035import org.nuxeo.lib.stream.log.LogLag;
036import org.nuxeo.lib.stream.log.LogManager;
037import org.nuxeo.lib.stream.log.LogOffset;
038import org.nuxeo.lib.stream.log.LogPartition;
039import org.nuxeo.lib.stream.log.LogRecord;
040import org.nuxeo.lib.stream.log.LogTailer;
041import org.nuxeo.lib.stream.log.RebalanceListener;
042
043public abstract class AbstractLogManager implements LogManager {
044    protected final Map<String, CloseableLogAppender> appenders = new ConcurrentHashMap<>();
045
046    protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap<>();
047
048    // this define a concurrent set of tailers
049    protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap<LogTailer, Boolean>());
050
051    protected abstract void create(String name, int size);
052
053    protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(String name);
054
055    protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions,
056            String group);
057
058    protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names,
059            RebalanceListener listener);
060
061    @Override
062    public synchronized boolean createIfNotExists(String name, int size) {
063        if (!exists(name)) {
064            create(name, size);
065            return true;
066        }
067        return false;
068    }
069
070    @Override
071    public boolean delete(String name) {
072        return false;
073    }
074
075    @Override
076    public <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) {
077        partitions.forEach(partition -> checkInvalidAssignment(group, partition));
078        LogTailer<M> ret = doCreateTailer(partitions, group);
079        partitions.forEach(partition -> tailersAssignments.put(new LogPartitionGroup(group, partition), ret));
080        tailers.add(ret);
081        return ret;
082    }
083
084    @Override
085    public boolean supportSubscribe() {
086        return false;
087    }
088
089    @Override
090    public <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
091            RebalanceListener listener) {
092        LogTailer<M> ret = doSubscribe(group, names, listener);
093        tailers.add(ret);
094        return ret;
095    }
096
097    protected void checkInvalidAssignment(String group, LogPartition partition) {
098        LogPartitionGroup key = new LogPartitionGroup(group, partition);
099        LogTailer ret = tailersAssignments.get(key);
100        if (ret != null && !ret.closed()) {
101            throw new IllegalArgumentException(
102                    "Tailer for this partition already created: " + partition + ", group: " + group);
103        }
104        if (!exists(partition.name())) {
105            throw new IllegalArgumentException("Tailer with unknown Log name: " + partition.name());
106        }
107    }
108
109    @SuppressWarnings("unchecked")
110    @Override
111    public synchronized <M extends Externalizable> LogAppender<M> getAppender(String name) {
112        return (LogAppender<M>) appenders.computeIfAbsent(name, n -> {
113            if (exists(n)) {
114                return createAppender(n);
115            }
116            throw new IllegalArgumentException("unknown Log name: " + n);
117        });
118    }
119
120    @Override
121    public <M extends Externalizable> List<Latency> getLatencyPerPartition(String name, String group,
122            Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
123        long now = System.currentTimeMillis();
124        List<LogLag> lags = getLagPerPartition(name, group);
125        List<Latency> ret = new ArrayList<>(lags.size());
126        int partition = 0;
127        for (LogLag lag : lags) {
128            if (lag.upper() == 0 || lag.lower() == 0) {
129                // empty partition or the group has not consumed any message
130                ret.add(new Latency(0, now, lag, null));
131                partition++;
132                continue;
133            }
134            // the committed offset point to the next record to process, here we want the last committed offset
135            // which is the previous one
136            LogOffset offset = new LogOffsetImpl(name, partition, lag.lowerOffset() - 1);
137            try (LogTailer<M> tailer = createTailer("tools", offset.partition())) {
138                tailer.seek(offset);
139                LogRecord<M> record = tailer.read(Duration.ofSeconds(1));
140                if (record == null) {
141                    throw new IllegalStateException("Unable to read " + offset + " lag: " + lag);
142                } else {
143                    try {
144                        long timestamp = timestampExtractor.apply(record.message());
145                        String key = keyExtractor.apply(record.message());
146                        ret.add(new Latency(timestamp, now, lag, key));
147                    } catch (ClassCastException e) {
148                        throw new IllegalStateException("Unexpected record type" + e.getMessage());
149                    }
150                }
151            } catch (InterruptedException e) {
152                Thread.currentThread().interrupt();
153                throw new RuntimeException(e);
154            }
155            partition++;
156        }
157        return ret;
158    }
159
160    @Override
161    public void close() {
162        appenders.values().stream().filter(Objects::nonNull).forEach(CloseableLogAppender::close);
163        appenders.clear();
164        tailers.stream().filter(Objects::nonNull).forEach(LogTailer::close);
165        tailers.clear();
166        tailersAssignments.clear();
167    }
168
169}