001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import java.io.Externalizable; 022import java.time.Duration; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.List; 026 027import org.nuxeo.lib.stream.codec.Codec; 028import org.nuxeo.lib.stream.log.LogOffset; 029import org.nuxeo.lib.stream.log.LogPartition; 030import org.nuxeo.lib.stream.log.LogRecord; 031import org.nuxeo.lib.stream.log.LogTailer; 032import org.nuxeo.lib.stream.log.Name; 033 034/** 035 * A compound tailer to handle multiple partitions. 036 * 037 * @since 9.3 038 */ 039public class ChronicleCompoundLogTailer<M extends Externalizable> implements LogTailer<M> { 040 protected final List<ChronicleLogTailer<M>> tailers = new ArrayList<>(); 041 042 protected final Name group; 043 044 protected final int size; 045 046 protected final List<LogPartition> logPartitions = new ArrayList<>(); 047 048 protected final Codec<M> codec; 049 050 protected boolean closed; 051 052 protected long counter; 053 054 public ChronicleCompoundLogTailer(Collection<ChronicleLogTailer<M>> tailers, Name group) { 055 // empty tailers is an accepted input 056 this.tailers.addAll(tailers); 057 this.group = group; 058 this.size = tailers.size(); 059 if (tailers.isEmpty()) { 060 this.codec = null; 061 } else { 062 this.codec = tailers.iterator().next().getCodec(); 063 } 064 tailers.forEach(partition -> logPartitions.addAll(partition.assignments())); 065 } 066 067 @Override 068 public LogRecord<M> read(Duration timeout) throws InterruptedException { 069 LogRecord<M> ret = read(); 070 if (ret != null) { 071 return ret; 072 } 073 final long timeoutMs = timeout.toMillis(); 074 final long deadline = System.currentTimeMillis() + timeoutMs; 075 final long delay = Math.min(ChronicleLogTailer.POLL_INTERVAL_MS, timeoutMs); 076 while (ret == null && System.currentTimeMillis() < deadline) { 077 Thread.sleep(delay); 078 ret = read(); 079 } 080 return ret; 081 } 082 083 protected LogRecord<M> read() { 084 if (size <= 0) { 085 return null; 086 } 087 // round robin on tailers 088 LogRecord<M> ret; 089 long end = counter + size; 090 do { 091 counter++; 092 int i = (int) counter % size; 093 ret = tailers.get(i).read(); 094 if (ret != null) { 095 return ret; 096 } 097 } while (counter < end); 098 return null; 099 } 100 101 @Override 102 public LogOffset commit(LogPartition partition) { 103 for (LogTailer<M> tailer : tailers) { 104 if (tailer.assignments().contains(partition)) { 105 return tailer.commit(partition); 106 } 107 } 108 throw new IllegalArgumentException("No tailer matching: " + partition); 109 } 110 111 @Override 112 public void commit() { 113 tailers.forEach(LogTailer::commit); 114 } 115 116 @Override 117 public void toEnd() { 118 tailers.forEach(ChronicleLogTailer::toEnd); 119 } 120 121 @Override 122 public void toStart() { 123 tailers.forEach(ChronicleLogTailer::toStart); 124 } 125 126 @Override 127 public void toLastCommitted() { 128 tailers.forEach(ChronicleLogTailer::toLastCommitted); 129 } 130 131 @Override 132 public Collection<LogPartition> assignments() { 133 return logPartitions; 134 } 135 136 @Override 137 public Name group() { 138 return group; 139 } 140 141 @Override 142 public boolean closed() { 143 return closed; 144 } 145 146 @Override 147 public Codec<M> getCodec() { 148 return codec; 149 } 150 151 @Override 152 public void seek(LogOffset offset) { 153 for (LogTailer<M> tailer : tailers) { 154 if (tailer.assignments().contains(offset.partition())) { 155 tailer.seek(offset); 156 return; 157 } 158 } 159 // Should be an IllegalArgumentException but Kafka raise a state exception so do the same 160 throw new IllegalStateException("Cannot seek, tailer " + this + " has no assignment for partition: " + offset); 161 } 162 163 @Override 164 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 165 throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp"); 166 } 167 168 @Override 169 public void reset() { 170 tailers.forEach(ChronicleLogTailer::reset); 171 } 172 173 @Override 174 public void reset(LogPartition partition) { 175 ChronicleLogTailer<M> tailer = tailers.stream() 176 .filter(t -> t.assignments().contains(partition)) 177 .findFirst() 178 .orElseThrow(() -> new IllegalArgumentException(String.format( 179 "Cannot reset, partition: %s not found on tailer assignments: %s", 180 partition, logPartitions))); 181 tailer.reset(); 182 } 183 184 @Override 185 public void close() { 186 for (ChronicleLogTailer<M> tailer : tailers) { 187 tailer.close(); 188 } 189 closed = true; 190 } 191 192}