001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import java.io.Externalizable;
022import java.time.Duration;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.List;
026
027import org.nuxeo.lib.stream.codec.Codec;
028import org.nuxeo.lib.stream.log.LogOffset;
029import org.nuxeo.lib.stream.log.LogPartition;
030import org.nuxeo.lib.stream.log.LogRecord;
031import org.nuxeo.lib.stream.log.LogTailer;
032import org.nuxeo.lib.stream.log.Name;
033
034/**
035 * A compound tailer to handle multiple partitions.
036 *
037 * @since 9.3
038 */
039public class ChronicleCompoundLogTailer<M extends Externalizable> implements LogTailer<M> {
040    protected final List<ChronicleLogTailer<M>> tailers = new ArrayList<>();
041
042    protected final Name group;
043
044    protected final int size;
045
046    protected final List<LogPartition> logPartitions = new ArrayList<>();
047
048    protected final Codec<M> codec;
049
050    protected boolean closed;
051
052    protected long counter;
053
054    public ChronicleCompoundLogTailer(Collection<ChronicleLogTailer<M>> tailers, Name group) {
055        // empty tailers is an accepted input
056        this.tailers.addAll(tailers);
057        this.group = group;
058        this.size = tailers.size();
059        if (tailers.isEmpty()) {
060            this.codec = null;
061        } else {
062            this.codec = tailers.iterator().next().getCodec();
063        }
064        tailers.forEach(partition -> logPartitions.addAll(partition.assignments()));
065    }
066
067    @Override
068    public LogRecord<M> read(Duration timeout) throws InterruptedException {
069        LogRecord<M> ret = read();
070        if (ret != null) {
071            return ret;
072        }
073        final long timeoutMs = timeout.toMillis();
074        final long deadline = System.currentTimeMillis() + timeoutMs;
075        final long delay = Math.min(ChronicleLogTailer.POLL_INTERVAL_MS, timeoutMs);
076        while (ret == null && System.currentTimeMillis() < deadline) {
077            Thread.sleep(delay);
078            ret = read();
079        }
080        return ret;
081    }
082
083    protected LogRecord<M> read() {
084        if (size <= 0) {
085            return null;
086        }
087        // round robin on tailers
088        LogRecord<M> ret;
089        long end = counter + size;
090        do {
091            counter++;
092            int i = (int) counter % size;
093            ret = tailers.get(i).read();
094            if (ret != null) {
095                return ret;
096            }
097        } while (counter < end);
098        return null;
099    }
100
101    @Override
102    public LogOffset commit(LogPartition partition) {
103        for (LogTailer<M> tailer : tailers) {
104            if (tailer.assignments().contains(partition)) {
105                return tailer.commit(partition);
106            }
107        }
108        throw new IllegalArgumentException("No tailer matching: " + partition);
109    }
110
111    @Override
112    public void commit() {
113        tailers.forEach(LogTailer::commit);
114    }
115
116    @Override
117    public void toEnd() {
118        tailers.forEach(ChronicleLogTailer::toEnd);
119    }
120
121    @Override
122    public void toStart() {
123        tailers.forEach(ChronicleLogTailer::toStart);
124    }
125
126    @Override
127    public void toLastCommitted() {
128        tailers.forEach(ChronicleLogTailer::toLastCommitted);
129    }
130
131    @Override
132    public Collection<LogPartition> assignments() {
133        return logPartitions;
134    }
135
136    @Override
137    public Name group() {
138        return group;
139    }
140
141    @Override
142    public boolean closed() {
143        return closed;
144    }
145
146    @Override
147    public Codec<M> getCodec() {
148        return codec;
149    }
150
151    @Override
152    public void seek(LogOffset offset) {
153        for (LogTailer<M> tailer : tailers) {
154            if (tailer.assignments().contains(offset.partition())) {
155                tailer.seek(offset);
156                return;
157            }
158        }
159        // Should be an IllegalArgumentException but Kafka raise a state exception so do the same
160        throw new IllegalStateException("Cannot seek, tailer " + this + " has no assignment for partition: " + offset);
161    }
162
163    @Override
164    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
165        throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp");
166    }
167
168    @Override
169    public void reset() {
170        tailers.forEach(ChronicleLogTailer::reset);
171    }
172
173    @Override
174    public void reset(LogPartition partition) {
175        ChronicleLogTailer<M> tailer = tailers.stream()
176                                              .filter(t -> t.assignments().contains(partition))
177                                              .findFirst()
178                                              .orElseThrow(() -> new IllegalArgumentException(String.format(
179                                                      "Cannot reset, partition: %s not found on tailer assignments: %s",
180                                                      partition, logPartitions)));
181        tailer.reset();
182    }
183
184    @Override
185    public void close() {
186        for (ChronicleLogTailer<M> tailer : tailers) {
187            tailer.close();
188        }
189        closed = true;
190    }
191
192}