001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.MSG_KEY; 023 024import java.io.Externalizable; 025import java.time.Duration; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.Objects; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.lib.stream.codec.Codec; 037import org.nuxeo.lib.stream.log.LogOffset; 038import org.nuxeo.lib.stream.log.LogPartition; 039import org.nuxeo.lib.stream.log.LogRecord; 040import org.nuxeo.lib.stream.log.LogTailer; 041import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 042import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 043 044import net.openhft.chronicle.queue.ExcerptTailer; 045import net.openhft.chronicle.queue.TailerState; 046 047/** 048 * @since 9.3 049 */ 050public class ChronicleLogTailer<M extends Externalizable> implements LogTailer<M> { 051 protected static final long POLL_INTERVAL_MS = 100L; 052 053 // keep track of all tailers on the same namespace index even from different log 054 protected static final Set<LogPartitionGroup> tailersId = Collections.newSetFromMap( 055 new ConcurrentHashMap<LogPartitionGroup, Boolean>()); 056 057 private static final Log log = LogFactory.getLog(ChronicleLogTailer.class); 058 059 protected final String basePath; 060 061 protected final ExcerptTailer cqTailer; 062 063 protected final ChronicleLogOffsetTracker offsetTracker; 064 065 protected final LogPartitionGroup id; 066 067 protected final LogPartition partition; 068 069 protected final Codec<M> codec; 070 071 protected volatile boolean closed = false; 072 073 public ChronicleLogTailer(Codec<M> codec, String basePath, ExcerptTailer cqTailer, LogPartition partition, 074 String group, ChronicleRetentionDuration retention) { 075 Objects.requireNonNull(group); 076 this.codec = codec; 077 this.basePath = basePath; 078 this.cqTailer = cqTailer; 079 this.partition = partition; 080 this.id = new LogPartitionGroup(group, partition.name(), partition.partition()); 081 registerTailer(); 082 this.offsetTracker = new ChronicleLogOffsetTracker(basePath, partition.partition(), group, retention); 083 toLastCommitted(); 084 } 085 086 protected void registerTailer() { 087 if (!tailersId.add(id)) { 088 throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id); 089 } 090 } 091 092 protected void unregisterTailer() { 093 tailersId.remove(id); 094 } 095 096 @Override 097 public LogRecord<M> read(Duration timeout) throws InterruptedException { 098 LogRecord<M> ret = read(); 099 if (ret != null) { 100 return ret; 101 } 102 long timeoutMs = timeout.toMillis(); 103 long deadline = System.currentTimeMillis() + timeoutMs; 104 long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 105 while (ret == null && System.currentTimeMillis() < deadline) { 106 Thread.sleep(delay); 107 ret = read(); 108 } 109 return ret; 110 } 111 112 @SuppressWarnings("unchecked") 113 protected LogRecord<M> read() { 114 if (closed) { 115 throw new IllegalStateException("The tailer has been closed."); 116 } 117 List<M> value = new ArrayList<>(1); 118 long offset = cqTailer.index(); 119 if (NO_CODEC.equals(codec)) { 120 // default format to keep backward compatibility 121 try { 122 if (!cqTailer.readDocument(w -> value.add((M) w.read(MSG_KEY).object()))) { 123 return null; 124 } 125 } catch (ClassCastException e) { 126 throw new IllegalArgumentException(e); 127 } 128 } else { 129 if (!cqTailer.readDocument(w -> value.add(codec.decode(w.read().bytes())))) { 130 return null; 131 } 132 } 133 return new LogRecord<>(value.get(0), new LogOffsetImpl(partition, offset)); 134 } 135 136 @Override 137 public LogOffset commit(LogPartition partition) { 138 if (!this.partition.equals(partition)) { 139 throw new IllegalArgumentException("Cannot commit this partition: " + partition + " from " + id); 140 } 141 long offset = cqTailer.index(); 142 offsetTracker.commit(offset); 143 if (log.isTraceEnabled()) { 144 log.trace(String.format("Commit %s:+%d", id, offset)); 145 } 146 return new LogOffsetImpl(partition, offset); 147 } 148 149 @Override 150 public void commit() { 151 commit(partition); 152 } 153 154 @Override 155 public void toEnd() { 156 log.debug(String.format("toEnd: %s", id)); 157 cqTailer.toEnd(); 158 } 159 160 @Override 161 public void toStart() { 162 log.debug(String.format("toStart: %s", id)); 163 cqTailer.toStart(); 164 if (!cqTailer.state().equals(TailerState.FOUND_CYCLE)) { 165 log.info("Unable to move to start because the tailer is not initialized, " + this); 166 } 167 } 168 169 @Override 170 public void toLastCommitted() { 171 long offset = offsetTracker.getLastCommittedOffset(); 172 if (offset > 0) { 173 log.debug(String.format("toLastCommitted: %s, found: %d", id, offset)); 174 if (!cqTailer.moveToIndex(offset) && cqTailer.index() != offset) { 175 // sometime moveToIndex returns false but offset is moved 176 throw new IllegalStateException( 177 "Unable to move to the last committed offset, " + this + " offset: " + offset); 178 } 179 } else { 180 log.debug(String.format("toLastCommitted: %s, not found, move toStart", id)); 181 cqTailer.toStart(); 182 } 183 } 184 185 @Override 186 public void seek(LogOffset offset) { 187 if (!this.partition.equals(offset.partition())) { 188 throw new IllegalStateException( 189 "Cannot seek, tailer " + this + " has no assignment for partition: " + offset); 190 } 191 log.debug("Seek to " + offset + " from tailer: " + this); 192 if (!cqTailer.moveToIndex(offset.offset()) && cqTailer.index() != offset.offset()) { 193 throw new IllegalStateException("Unable to seek to offset, " + this + " offset: " + offset); 194 } 195 } 196 197 @Override 198 public void reset() { 199 reset(new LogPartition(id.name, id.partition)); 200 } 201 202 @Override 203 public void reset(LogPartition partition) { 204 if (!this.partition.equals(partition)) { 205 throw new IllegalArgumentException("Cannot reset this partition: " + partition + " from " + id); 206 } 207 log.info("Reset offset for partition: " + partition + " from tailer: " + this); 208 cqTailer.toStart(); 209 commit(partition); 210 } 211 212 @Override 213 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 214 throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp"); 215 } 216 217 @Override 218 public Collection<LogPartition> assignments() { 219 return Collections.singletonList(new LogPartition(id.name, id.partition)); 220 } 221 222 @Override 223 public String group() { 224 return id.group; 225 } 226 227 @Override 228 public void close() { 229 if (!closed) { 230 log.debug("Closing: " + toString()); 231 offsetTracker.close(); 232 unregisterTailer(); 233 closed = true; 234 } 235 } 236 237 @Override 238 public boolean closed() { 239 return closed; 240 } 241 242 @Override 243 public Codec<M> getCodec() { 244 return codec; 245 } 246 247 @Override 248 public String toString() { 249 return "ChronicleLogTailer{" + "basePath='" + basePath + '\'' + ", id=" + id + ", closed=" + closed + ", codec=" 250 + codec + '}'; 251 } 252 253}