001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import java.io.Externalizable;
022import java.time.Duration;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.List;
026
027import org.nuxeo.lib.stream.log.LogOffset;
028import org.nuxeo.lib.stream.log.LogPartition;
029import org.nuxeo.lib.stream.log.LogRecord;
030import org.nuxeo.lib.stream.log.LogTailer;
031
032/**
033 * A compound tailer to handle multiple partitions.
034 *
035 * @since 9.3
036 */
037public class ChronicleCompoundLogTailer<M extends Externalizable> implements LogTailer<M> {
038    protected final List<ChronicleLogTailer<M>> tailers = new ArrayList<>();
039
040    protected final String group;
041
042    protected final int size;
043
044    protected final List<LogPartition> logPartitions = new ArrayList<>();
045
046    protected boolean closed;
047
048    protected long counter;
049
050    public ChronicleCompoundLogTailer(Collection<ChronicleLogTailer<M>> tailers, String group) {
051        // empty tailers is an accepted input
052        this.tailers.addAll(tailers);
053        this.group = group;
054        this.size = tailers.size();
055        tailers.forEach(partition -> logPartitions.addAll(partition.assignments()));
056    }
057
058    @Override
059    public LogRecord<M> read(Duration timeout) throws InterruptedException {
060        LogRecord<M> ret = read();
061        if (ret != null) {
062            return ret;
063        }
064        final long timeoutMs = timeout.toMillis();
065        final long deadline = System.currentTimeMillis() + timeoutMs;
066        final long delay = Math.min(ChronicleLogTailer.POLL_INTERVAL_MS, timeoutMs);
067        while (ret == null && System.currentTimeMillis() < deadline) {
068            Thread.sleep(delay);
069            ret = read();
070        }
071        return ret;
072    }
073
074    protected LogRecord<M> read() {
075        if (size <= 0) {
076            return null;
077        }
078        // round robin on tailers
079        LogRecord<M> ret;
080        long end = counter + size;
081        do {
082            counter++;
083            int i = (int) counter % size;
084            ret = tailers.get(i).read();
085            if (ret != null) {
086                return ret;
087            }
088        } while (counter < end);
089        return null;
090    }
091
092    @Override
093    public LogOffset commit(LogPartition partition) {
094        for (LogTailer<M> tailer : tailers) {
095            if (tailer.assignments().contains(partition)) {
096                return tailer.commit(partition);
097            }
098        }
099        throw new IllegalArgumentException("No tailer matching: " + partition);
100    }
101
102    @Override
103    public void commit() {
104        tailers.forEach(LogTailer::commit);
105    }
106
107    @Override
108    public void toEnd() {
109        tailers.forEach(ChronicleLogTailer::toEnd);
110    }
111
112    @Override
113    public void toStart() {
114        tailers.forEach(ChronicleLogTailer::toStart);
115    }
116
117    @Override
118    public void toLastCommitted() {
119        tailers.forEach(ChronicleLogTailer::toLastCommitted);
120    }
121
122    @Override
123    public Collection<LogPartition> assignments() {
124        return logPartitions;
125    }
126
127    @Override
128    public String group() {
129        return group;
130    }
131
132    @Override
133    public boolean closed() {
134        return closed;
135    }
136
137    @Override
138    public void seek(LogOffset offset) {
139        for (LogTailer<M> tailer : tailers) {
140            if (tailer.assignments().contains(offset.partition())) {
141                tailer.seek(offset);
142                return;
143            }
144        }
145        // Should be an IllegalArgumentException but Kafka raise a state exception so do the same
146        throw new IllegalStateException("Cannot seek, tailer " + this + " has no assignment for partition: " + offset);
147    }
148
149    @Override
150    public void reset() {
151        tailers.forEach(ChronicleLogTailer::reset);
152    }
153
154    @Override
155    public void reset(LogPartition partition) {
156        ChronicleLogTailer<M> tailer = tailers.stream()
157                                              .filter(t -> t.assignments().contains(partition))
158                                              .findFirst()
159                                              .orElseThrow(() -> new IllegalArgumentException(String.format(
160                                                      "Cannot reset, partition: %s not found on tailer assignments: %s",
161                                                      partition, logPartitions)));
162        tailer.reset();
163    }
164
165    @Override
166    public void close() {
167        for (ChronicleLogTailer<M> tailer : tailers) {
168            tailer.close();
169        }
170        closed = true;
171    }
172}