001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.MSG_KEY; 023 024import java.io.Externalizable; 025import java.time.Duration; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.Objects; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.lib.stream.codec.Codec; 037import org.nuxeo.lib.stream.log.LogOffset; 038import org.nuxeo.lib.stream.log.LogPartition; 039import org.nuxeo.lib.stream.log.LogRecord; 040import org.nuxeo.lib.stream.log.LogTailer; 041import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 042import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 043 044import net.openhft.chronicle.queue.ExcerptTailer; 045import net.openhft.chronicle.queue.TailerState; 046 047/** 048 * @since 9.3 049 */ 050public class ChronicleLogTailer<M extends Externalizable> implements LogTailer<M> { 051 protected static final long POLL_INTERVAL_MS = 100L; 052 053 // keep track of all tailers on the same namespace index even from different log 054 protected static final Set<LogPartitionGroup> tailersId = Collections.newSetFromMap(new ConcurrentHashMap<>()); 055 056 private static final Log log = LogFactory.getLog(ChronicleLogTailer.class); 057 058 protected final String basePath; 059 060 protected final ExcerptTailer cqTailer; 061 062 protected final ChronicleLogOffsetTracker offsetTracker; 063 064 protected final LogPartitionGroup id; 065 066 protected final LogPartition partition; 067 068 protected final Codec<M> codec; 069 070 protected volatile boolean closed = false; 071 072 public ChronicleLogTailer(Codec<M> codec, String basePath, ExcerptTailer cqTailer, LogPartition partition, 073 String group, ChronicleRetentionDuration retention) { 074 Objects.requireNonNull(group); 075 this.codec = codec; 076 this.basePath = basePath; 077 this.cqTailer = cqTailer; 078 this.partition = partition; 079 this.id = new LogPartitionGroup(group, partition.name(), partition.partition()); 080 registerTailer(); 081 this.offsetTracker = new ChronicleLogOffsetTracker(basePath, partition.partition(), group, retention); 082 toLastCommitted(); 083 } 084 085 protected void registerTailer() { 086 if (!tailersId.add(id)) { 087 throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id); 088 } 089 } 090 091 protected void unregisterTailer() { 092 tailersId.remove(id); 093 } 094 095 @Override 096 public LogRecord<M> read(Duration timeout) throws InterruptedException { 097 LogRecord<M> ret = read(); 098 if (ret != null) { 099 return ret; 100 } 101 long timeoutMs = timeout.toMillis(); 102 long deadline = System.currentTimeMillis() + timeoutMs; 103 long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 104 while (ret == null && System.currentTimeMillis() < deadline) { 105 Thread.sleep(delay); 106 ret = read(); 107 } 108 return ret; 109 } 110 111 @SuppressWarnings("unchecked") 112 protected LogRecord<M> read() { 113 if (closed) { 114 throw new IllegalStateException("The tailer has been closed."); 115 } 116 List<M> value = new ArrayList<>(1); 117 long offset = cqTailer.index(); 118 if (NO_CODEC.equals(codec)) { 119 // default format to keep backward compatibility 120 try { 121 if (!cqTailer.readDocument(w -> value.add((M) w.read(MSG_KEY).object()))) { 122 return null; 123 } 124 } catch (ClassCastException e) { 125 throw new IllegalArgumentException(e); 126 } 127 } else { 128 if (!cqTailer.readDocument(w -> value.add(codec.decode(w.read().bytes())))) { 129 return null; 130 } 131 } 132 return new LogRecord<>(value.get(0), new LogOffsetImpl(partition, offset)); 133 } 134 135 @Override 136 public LogOffset commit(LogPartition partition) { 137 if (!this.partition.equals(partition)) { 138 throw new IllegalArgumentException("Cannot commit this partition: " + partition + " from " + id); 139 } 140 long offset = cqTailer.index(); 141 offsetTracker.commit(offset); 142 if (log.isTraceEnabled()) { 143 log.trace(String.format("Commit %s:+%d", id, offset)); 144 } 145 return new LogOffsetImpl(partition, offset); 146 } 147 148 @Override 149 public void commit() { 150 commit(partition); 151 } 152 153 @Override 154 public void toEnd() { 155 log.debug(String.format("toEnd: %s", id)); 156 cqTailer.toEnd(); 157 } 158 159 @Override 160 public void toStart() { 161 log.debug(String.format("toStart: %s", id)); 162 cqTailer.toStart(); 163 if (!cqTailer.state().equals(TailerState.FOUND_CYCLE)) { 164 log.info("Unable to move to start because the tailer is not initialized, " + this); 165 } 166 } 167 168 @Override 169 public void toLastCommitted() { 170 long offset = offsetTracker.getLastCommittedOffset(); 171 if (offset > 0) { 172 log.debug(String.format("toLastCommitted: %s, found: %d", id, offset)); 173 if (!cqTailer.moveToIndex(offset) && cqTailer.index() != offset) { 174 // sometime moveToIndex returns false but offset is moved 175 throw new IllegalStateException( 176 "Unable to move to the last committed offset, " + this + " offset: " + offset); 177 } 178 } else { 179 log.debug(String.format("toLastCommitted: %s, not found, move toStart", id)); 180 cqTailer.toStart(); 181 } 182 } 183 184 @Override 185 public void seek(LogOffset offset) { 186 if (!this.partition.equals(offset.partition())) { 187 throw new IllegalStateException( 188 "Cannot seek, tailer " + this + " has no assignment for partition: " + offset); 189 } 190 log.debug("Seek to " + offset + " from tailer: " + this); 191 if (!cqTailer.moveToIndex(offset.offset()) && cqTailer.index() != offset.offset()) { 192 throw new IllegalStateException("Unable to seek to offset, " + this + " offset: " + offset); 193 } 194 } 195 196 @Override 197 public void reset() { 198 reset(new LogPartition(id.name, id.partition)); 199 } 200 201 @Override 202 public void reset(LogPartition partition) { 203 if (!this.partition.equals(partition)) { 204 throw new IllegalArgumentException("Cannot reset this partition: " + partition + " from " + id); 205 } 206 log.info("Reset offset for partition: " + partition + " from tailer: " + this); 207 cqTailer.toStart(); 208 commit(partition); 209 } 210 211 @Override 212 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 213 throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp"); 214 } 215 216 @Override 217 public Collection<LogPartition> assignments() { 218 return Collections.singletonList(new LogPartition(id.name, id.partition)); 219 } 220 221 @Override 222 public String group() { 223 return id.group; 224 } 225 226 @Override 227 public void close() { 228 if (!closed) { 229 log.debug("Closing: " + toString()); 230 offsetTracker.close(); 231 unregisterTailer(); 232 closed = true; 233 } 234 } 235 236 @Override 237 public boolean closed() { 238 return closed; 239 } 240 241 @Override 242 public Codec<M> getCodec() { 243 return codec; 244 } 245 246 @Override 247 public String toString() { 248 return "ChronicleLogTailer{" + "basePath='" + basePath + '\'' + ", id=" + id + ", closed=" + closed + ", codec=" 249 + codec + '}'; 250 } 251 252}