001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import java.io.Externalizable; 022import java.time.Duration; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.List; 026 027import org.nuxeo.lib.stream.codec.Codec; 028import org.nuxeo.lib.stream.log.LogOffset; 029import org.nuxeo.lib.stream.log.LogPartition; 030import org.nuxeo.lib.stream.log.LogRecord; 031import org.nuxeo.lib.stream.log.LogTailer; 032 033/** 034 * A compound tailer to handle multiple partitions. 035 * 036 * @since 9.3 037 */ 038public class ChronicleCompoundLogTailer<M extends Externalizable> implements LogTailer<M> { 039 protected final List<ChronicleLogTailer<M>> tailers = new ArrayList<>(); 040 041 protected final String group; 042 043 protected final int size; 044 045 protected final List<LogPartition> logPartitions = new ArrayList<>(); 046 047 protected final Codec<M> codec; 048 049 protected boolean closed; 050 051 protected long counter; 052 053 public ChronicleCompoundLogTailer(Collection<ChronicleLogTailer<M>> tailers, String group) { 054 // empty tailers is an accepted input 055 this.tailers.addAll(tailers); 056 this.group = group; 057 this.size = tailers.size(); 058 if (tailers.isEmpty()) { 059 this.codec = null; 060 } else { 061 this.codec = tailers.iterator().next().getCodec(); 062 } 063 tailers.forEach(partition -> logPartitions.addAll(partition.assignments())); 064 } 065 066 @Override 067 public LogRecord<M> read(Duration timeout) throws InterruptedException { 068 LogRecord<M> ret = read(); 069 if (ret != null) { 070 return ret; 071 } 072 final long timeoutMs = timeout.toMillis(); 073 final long deadline = System.currentTimeMillis() + timeoutMs; 074 final long delay = Math.min(ChronicleLogTailer.POLL_INTERVAL_MS, timeoutMs); 075 while (ret == null && System.currentTimeMillis() < deadline) { 076 Thread.sleep(delay); 077 ret = read(); 078 } 079 return ret; 080 } 081 082 protected LogRecord<M> read() { 083 if (size <= 0) { 084 return null; 085 } 086 // round robin on tailers 087 LogRecord<M> ret; 088 long end = counter + size; 089 do { 090 counter++; 091 int i = (int) counter % size; 092 ret = tailers.get(i).read(); 093 if (ret != null) { 094 return ret; 095 } 096 } while (counter < end); 097 return null; 098 } 099 100 @Override 101 public LogOffset commit(LogPartition partition) { 102 for (LogTailer<M> tailer : tailers) { 103 if (tailer.assignments().contains(partition)) { 104 return tailer.commit(partition); 105 } 106 } 107 throw new IllegalArgumentException("No tailer matching: " + partition); 108 } 109 110 @Override 111 public void commit() { 112 tailers.forEach(LogTailer::commit); 113 } 114 115 @Override 116 public void toEnd() { 117 tailers.forEach(ChronicleLogTailer::toEnd); 118 } 119 120 @Override 121 public void toStart() { 122 tailers.forEach(ChronicleLogTailer::toStart); 123 } 124 125 @Override 126 public void toLastCommitted() { 127 tailers.forEach(ChronicleLogTailer::toLastCommitted); 128 } 129 130 @Override 131 public Collection<LogPartition> assignments() { 132 return logPartitions; 133 } 134 135 @Override 136 public String group() { 137 return group; 138 } 139 140 @Override 141 public boolean closed() { 142 return closed; 143 } 144 145 @Override 146 public Codec<M> getCodec() { 147 return codec; 148 } 149 150 @Override 151 public void seek(LogOffset offset) { 152 for (LogTailer<M> tailer : tailers) { 153 if (tailer.assignments().contains(offset.partition())) { 154 tailer.seek(offset); 155 return; 156 } 157 } 158 // Should be an IllegalArgumentException but Kafka raise a state exception so do the same 159 throw new IllegalStateException("Cannot seek, tailer " + this + " has no assignment for partition: " + offset); 160 } 161 162 @Override 163 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 164 throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp"); 165 } 166 167 @Override 168 public void reset() { 169 tailers.forEach(ChronicleLogTailer::reset); 170 } 171 172 @Override 173 public void reset(LogPartition partition) { 174 ChronicleLogTailer<M> tailer = tailers.stream() 175 .filter(t -> t.assignments().contains(partition)) 176 .findFirst() 177 .orElseThrow(() -> new IllegalArgumentException(String.format( 178 "Cannot reset, partition: %s not found on tailer assignments: %s", 179 partition, logPartitions))); 180 tailer.reset(); 181 } 182 183 @Override 184 public void close() { 185 for (ChronicleLogTailer<M> tailer : tailers) { 186 tailer.close(); 187 } 188 closed = true; 189 } 190 191}