001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import java.io.Externalizable; 022import java.time.Duration; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.List; 026 027import org.nuxeo.lib.stream.log.LogOffset; 028import org.nuxeo.lib.stream.log.LogPartition; 029import org.nuxeo.lib.stream.log.LogRecord; 030import org.nuxeo.lib.stream.log.LogTailer; 031 032/** 033 * A compound tailer to handle multiple partitions. 034 * 035 * @since 9.3 036 */ 037public class ChronicleCompoundLogTailer<M extends Externalizable> implements LogTailer<M> { 038 protected final List<ChronicleLogTailer<M>> tailers = new ArrayList<>(); 039 040 protected final String group; 041 042 protected final int size; 043 044 protected final List<LogPartition> logPartitions = new ArrayList<>(); 045 046 protected boolean closed; 047 048 protected long counter; 049 050 public ChronicleCompoundLogTailer(Collection<ChronicleLogTailer<M>> tailers, String group) { 051 // empty tailers is an accepted input 052 this.tailers.addAll(tailers); 053 this.group = group; 054 this.size = tailers.size(); 055 tailers.forEach(partition -> logPartitions.addAll(partition.assignments())); 056 } 057 058 @Override 059 public LogRecord<M> read(Duration timeout) throws InterruptedException { 060 LogRecord<M> ret = read(); 061 if (ret != null) { 062 return ret; 063 } 064 final long timeoutMs = timeout.toMillis(); 065 final long deadline = System.currentTimeMillis() + timeoutMs; 066 final long delay = Math.min(ChronicleLogTailer.POLL_INTERVAL_MS, timeoutMs); 067 while (ret == null && System.currentTimeMillis() < deadline) { 068 Thread.sleep(delay); 069 ret = read(); 070 } 071 return ret; 072 } 073 074 protected LogRecord<M> read() { 075 if (size <= 0) { 076 return null; 077 } 078 // round robin on tailers 079 LogRecord<M> ret; 080 long end = counter + size; 081 do { 082 counter++; 083 int i = (int) counter % size; 084 ret = tailers.get(i).read(); 085 if (ret != null) { 086 return ret; 087 } 088 } while (counter < end); 089 return null; 090 } 091 092 @Override 093 public LogOffset commit(LogPartition partition) { 094 for (LogTailer<M> tailer : tailers) { 095 if (tailer.assignments().contains(partition)) { 096 return tailer.commit(partition); 097 } 098 } 099 throw new IllegalArgumentException("No tailer matching: " + partition); 100 } 101 102 @Override 103 public void commit() { 104 tailers.forEach(LogTailer::commit); 105 } 106 107 @Override 108 public void toEnd() { 109 tailers.forEach(ChronicleLogTailer::toEnd); 110 } 111 112 @Override 113 public void toStart() { 114 tailers.forEach(ChronicleLogTailer::toStart); 115 } 116 117 @Override 118 public void toLastCommitted() { 119 tailers.forEach(ChronicleLogTailer::toLastCommitted); 120 } 121 122 @Override 123 public Collection<LogPartition> assignments() { 124 return logPartitions; 125 } 126 127 @Override 128 public String group() { 129 return group; 130 } 131 132 @Override 133 public boolean closed() { 134 return closed; 135 } 136 137 @Override 138 public void seek(LogOffset offset) { 139 for (LogTailer<M> tailer : tailers) { 140 if (tailer.assignments().contains(offset.partition())) { 141 tailer.seek(offset); 142 return; 143 } 144 } 145 // Should be an IllegalArgumentException but Kafka raise a state exception so do the same 146 throw new IllegalStateException("Cannot seek, tailer " + this + " has no assignment for partition: " + offset); 147 } 148 149 @Override 150 public void reset() { 151 tailers.forEach(ChronicleLogTailer::reset); 152 } 153 154 @Override 155 public void reset(LogPartition partition) { 156 ChronicleLogTailer<M> tailer = tailers.stream() 157 .filter(t -> t.assignments().contains(partition)) 158 .findFirst() 159 .orElseThrow(() -> new IllegalArgumentException(String.format( 160 "Cannot reset, partition: %s not found on tailer assignments: %s", 161 partition, logPartitions))); 162 tailer.reset(); 163 } 164 165 @Override 166 public void close() { 167 for (ChronicleLogTailer<M> tailer : tailers) { 168 tailer.close(); 169 } 170 closed = true; 171 } 172}