001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import java.io.Externalizable;
022import java.time.Duration;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.List;
026
027import org.nuxeo.lib.stream.codec.Codec;
028import org.nuxeo.lib.stream.log.LogOffset;
029import org.nuxeo.lib.stream.log.LogPartition;
030import org.nuxeo.lib.stream.log.LogRecord;
031import org.nuxeo.lib.stream.log.LogTailer;
032
033/**
034 * A compound tailer to handle multiple partitions.
035 *
036 * @since 9.3
037 */
038public class ChronicleCompoundLogTailer<M extends Externalizable> implements LogTailer<M> {
039    protected final List<ChronicleLogTailer<M>> tailers = new ArrayList<>();
040
041    protected final String group;
042
043    protected final int size;
044
045    protected final List<LogPartition> logPartitions = new ArrayList<>();
046
047    protected final Codec<M> codec;
048
049    protected boolean closed;
050
051    protected long counter;
052
053    public ChronicleCompoundLogTailer(Collection<ChronicleLogTailer<M>> tailers, String group) {
054        // empty tailers is an accepted input
055        this.tailers.addAll(tailers);
056        this.group = group;
057        this.size = tailers.size();
058        if (tailers.isEmpty()) {
059            this.codec = null;
060        } else {
061            this.codec = tailers.iterator().next().getCodec();
062        }
063        tailers.forEach(partition -> logPartitions.addAll(partition.assignments()));
064    }
065
066    @Override
067    public LogRecord<M> read(Duration timeout) throws InterruptedException {
068        LogRecord<M> ret = read();
069        if (ret != null) {
070            return ret;
071        }
072        final long timeoutMs = timeout.toMillis();
073        final long deadline = System.currentTimeMillis() + timeoutMs;
074        final long delay = Math.min(ChronicleLogTailer.POLL_INTERVAL_MS, timeoutMs);
075        while (ret == null && System.currentTimeMillis() < deadline) {
076            Thread.sleep(delay);
077            ret = read();
078        }
079        return ret;
080    }
081
082    protected LogRecord<M> read() {
083        if (size <= 0) {
084            return null;
085        }
086        // round robin on tailers
087        LogRecord<M> ret;
088        long end = counter + size;
089        do {
090            counter++;
091            int i = (int) counter % size;
092            ret = tailers.get(i).read();
093            if (ret != null) {
094                return ret;
095            }
096        } while (counter < end);
097        return null;
098    }
099
100    @Override
101    public LogOffset commit(LogPartition partition) {
102        for (LogTailer<M> tailer : tailers) {
103            if (tailer.assignments().contains(partition)) {
104                return tailer.commit(partition);
105            }
106        }
107        throw new IllegalArgumentException("No tailer matching: " + partition);
108    }
109
110    @Override
111    public void commit() {
112        tailers.forEach(LogTailer::commit);
113    }
114
115    @Override
116    public void toEnd() {
117        tailers.forEach(ChronicleLogTailer::toEnd);
118    }
119
120    @Override
121    public void toStart() {
122        tailers.forEach(ChronicleLogTailer::toStart);
123    }
124
125    @Override
126    public void toLastCommitted() {
127        tailers.forEach(ChronicleLogTailer::toLastCommitted);
128    }
129
130    @Override
131    public Collection<LogPartition> assignments() {
132        return logPartitions;
133    }
134
135    @Override
136    public String group() {
137        return group;
138    }
139
140    @Override
141    public boolean closed() {
142        return closed;
143    }
144
145    @Override
146    public Codec<M> getCodec() {
147        return codec;
148    }
149
150    @Override
151    public void seek(LogOffset offset) {
152        for (LogTailer<M> tailer : tailers) {
153            if (tailer.assignments().contains(offset.partition())) {
154                tailer.seek(offset);
155                return;
156            }
157        }
158        // Should be an IllegalArgumentException but Kafka raise a state exception so do the same
159        throw new IllegalStateException("Cannot seek, tailer " + this + " has no assignment for partition: " + offset);
160    }
161
162    @Override
163    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
164        throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp");
165    }
166
167    @Override
168    public void reset() {
169        tailers.forEach(ChronicleLogTailer::reset);
170    }
171
172    @Override
173    public void reset(LogPartition partition) {
174        ChronicleLogTailer<M> tailer = tailers.stream()
175                                              .filter(t -> t.assignments().contains(partition))
176                                              .findFirst()
177                                              .orElseThrow(() -> new IllegalArgumentException(String.format(
178                                                      "Cannot reset, partition: %s not found on tailer assignments: %s",
179                                                      partition, logPartitions)));
180        tailer.reset();
181    }
182
183    @Override
184    public void close() {
185        for (ChronicleLogTailer<M> tailer : tailers) {
186            tailer.close();
187        }
188        closed = true;
189    }
190
191}