001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.MSG_KEY; 023 024import java.io.Externalizable; 025import java.time.Duration; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.Objects; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.lib.stream.codec.Codec; 038import org.nuxeo.lib.stream.log.LogOffset; 039import org.nuxeo.lib.stream.log.LogPartition; 040import org.nuxeo.lib.stream.log.LogRecord; 041import org.nuxeo.lib.stream.log.LogTailer; 042import org.nuxeo.lib.stream.log.Name; 043import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 044import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 045 046import net.openhft.chronicle.queue.ExcerptTailer; 047import net.openhft.chronicle.queue.TailerState; 048 049/** 050 * @since 9.3 051 */ 052public class ChronicleLogTailer<M extends Externalizable> implements LogTailer<M> { 053 protected static final long POLL_INTERVAL_MS = 100L; 054 055 // keep track of all tailers on the same namespace index even from different log 056 protected static final Set<LogPartitionGroup> tailersId = Collections.newSetFromMap(new ConcurrentHashMap<>()); 057 058 private static final Log log = LogFactory.getLog(ChronicleLogTailer.class); 059 060 protected final String basePath; 061 062 protected final ExcerptTailer cqTailer; 063 064 protected ChronicleLogOffsetTracker offsetTracker; 065 066 protected final ChronicleRetentionDuration retention; 067 068 protected final LogPartitionGroup id; 069 070 protected final LogPartition partition; 071 072 protected final Codec<M> codec; 073 074 protected volatile boolean closed = false; 075 076 protected boolean initialized; 077 078 public ChronicleLogTailer(Codec<M> codec, String basePath, ExcerptTailer cqTailer, LogPartition partition, 079 Name group, ChronicleRetentionDuration retention) { 080 Objects.requireNonNull(group); 081 this.codec = codec; 082 this.basePath = basePath; 083 this.cqTailer = cqTailer; 084 this.partition = partition; 085 this.retention = retention; 086 this.id = new LogPartitionGroup(group, partition.name(), partition.partition()); 087 registerTailer(); 088 } 089 090 protected ChronicleLogOffsetTracker getOffsetTracker() { 091 if (offsetTracker == null) { 092 offsetTracker = new ChronicleLogOffsetTracker(basePath, partition.partition(), group(), retention); 093 } 094 return offsetTracker; 095 } 096 097 protected void registerTailer() { 098 if (!tailersId.add(id)) { 099 throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id); 100 } 101 } 102 103 protected void checkInitialized() { 104 if (initialized) { 105 return; 106 } 107 toLastCommitted(); 108 } 109 110 protected void unregisterTailer() { 111 tailersId.remove(id); 112 } 113 114 @Override 115 public LogRecord<M> read(Duration timeout) throws InterruptedException { 116 LogRecord<M> ret = read(); 117 if (ret != null) { 118 return ret; 119 } 120 long timeoutMs = timeout.toMillis(); 121 long deadline = System.currentTimeMillis() + timeoutMs; 122 long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 123 while (ret == null && System.currentTimeMillis() < deadline) { 124 Thread.sleep(delay); 125 ret = read(); 126 } 127 return ret; 128 } 129 130 @SuppressWarnings("unchecked") 131 protected LogRecord<M> read() { 132 if (closed) { 133 throw new IllegalStateException("The tailer has been closed."); 134 } 135 checkInitialized(); 136 List<M> value = new ArrayList<>(1); 137 AtomicLong offset = new AtomicLong(); 138 if (NO_CODEC.equals(codec)) { 139 // default format to keep backward compatibility 140 try { 141 if (!cqTailer.readDocument(w -> { 142 offset.set(cqTailer.index()); 143 value.add((M) w.read(MSG_KEY).object()); 144 })) { 145 return null; 146 } 147 } catch (ClassCastException e) { 148 throw new IllegalArgumentException(e); 149 } 150 } else { 151 if (!cqTailer.readDocument(w -> { 152 offset.set(cqTailer.index()); 153 value.add(codec.decode(w.read().bytes())); 154 })) { 155 return null; 156 } 157 } 158 return new LogRecord<>(value.get(0), new LogOffsetImpl(partition, offset.get())); 159 } 160 161 @Override 162 public LogOffset commit(LogPartition partition) { 163 checkInitialized(); 164 if (!this.partition.equals(partition)) { 165 throw new IllegalArgumentException("Cannot commit this partition: " + partition + " from " + id); 166 } 167 long offset = cqTailer.index(); 168 getOffsetTracker().commit(offset); 169 if (log.isTraceEnabled()) { 170 log.trace(String.format("Commit %s:+%d", id, offset)); 171 } 172 return new LogOffsetImpl(partition, offset); 173 } 174 175 @Override 176 public void commit() { 177 commit(partition); 178 } 179 180 @Override 181 public void toEnd() { 182 log.debug(String.format("toEnd: %s", id)); 183 cqTailer.toEnd(); 184 initialized = true; 185 } 186 187 @Override 188 public void toStart() { 189 log.debug(String.format("toStart: %s", id)); 190 cqTailer.toStart(); 191 if (!cqTailer.state().equals(TailerState.FOUND_CYCLE)) { 192 log.info("Unable to move to start because the tailer is not initialized, " + this); 193 } 194 initialized = true; 195 } 196 197 @Override 198 public void toLastCommitted() { 199 log.debug(String.format("toLastCommitted: %s", id)); 200 long offset = getOffsetTracker().getLastCommittedOffset(); 201 if (offset > 0) { 202 log.debug(String.format("toLastCommitted: %s, found: %d", id, offset)); 203 if (!cqTailer.moveToIndex(offset) && cqTailer.index() != offset) { 204 // sometime moveToIndex returns false but offset is moved 205 toStart(); 206 long startOffset = cqTailer.index(); 207 if (offset < startOffset) { 208 log.error("The last committed offset: " + offset + " for tailer: " + this 209 + " points to a record that has been deleted by the retention policy." 210 + " Records have been lost, continuing from the beginning of the partition offset: " 211 + startOffset); 212 } else { 213 // probably a corrupted Log where some cq4 files are missing 214 throw new IllegalStateException( 215 "Unable to move to the last committed offset: " + offset + " for tailer: " + this); 216 } 217 } 218 } else { 219 log.debug(String.format("toLastCommitted: %s, not found, move toStart", id)); 220 cqTailer.toStart(); 221 } 222 initialized = true; 223 } 224 225 @Override 226 public void seek(LogOffset offset) { 227 if (!this.partition.equals(offset.partition())) { 228 throw new IllegalStateException( 229 "Cannot seek, tailer " + this + " has no assignment for partition: " + offset); 230 } 231 log.debug("Seek to " + offset + " from tailer: " + this); 232 if (!cqTailer.moveToIndex(offset.offset()) && cqTailer.index() != offset.offset()) { 233 throw new IllegalStateException("Unable to seek to offset, " + this + " offset: " + offset); 234 } 235 initialized = true; 236 } 237 238 @Override 239 public void reset() { 240 reset(new LogPartition(id.name, id.partition)); 241 } 242 243 @Override 244 public void reset(LogPartition partition) { 245 if (!this.partition.equals(partition)) { 246 throw new IllegalArgumentException("Cannot reset this partition: " + partition + " from " + id); 247 } 248 log.info("Reset offset for partition: " + partition + " from tailer: " + this); 249 cqTailer.toStart(); 250 initialized = true; 251 commit(partition); 252 } 253 254 @Override 255 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 256 throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp"); 257 } 258 259 @Override 260 public Collection<LogPartition> assignments() { 261 return Collections.singletonList(new LogPartition(id.name, id.partition)); 262 } 263 264 @Override 265 public Name group() { 266 return id.group; 267 } 268 269 @Override 270 public void close() { 271 if (!closed) { 272 log.debug("Closing: " + toString()); 273 if (offsetTracker != null) { 274 offsetTracker.close(); 275 offsetTracker = null; 276 } 277 unregisterTailer(); 278 closed = true; 279 initialized = false; 280 } 281 } 282 283 @Override 284 public boolean closed() { 285 return closed; 286 } 287 288 @Override 289 public Codec<M> getCodec() { 290 return codec; 291 } 292 293 @Override 294 public String toString() { 295 return "ChronicleLogTailer{" + "basePath='" + basePath + '\'' + ", id=" + id + ", closed=" + closed + ", codec=" 296 + codec + '}'; 297 } 298 299}