001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import java.io.Externalizable; 022import java.time.Duration; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.List; 027import java.util.Objects; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.lib.stream.log.LogOffset; 034import org.nuxeo.lib.stream.log.LogPartition; 035import org.nuxeo.lib.stream.log.LogRecord; 036import org.nuxeo.lib.stream.log.LogTailer; 037import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 038import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 039 040import net.openhft.chronicle.queue.ExcerptTailer; 041 042/** 043 * @since 9.3 044 */ 045public class ChronicleLogTailer<M extends Externalizable> implements LogTailer<M> { 046 protected static final long POLL_INTERVAL_MS = 100L; 047 048 // keep track of all tailers on the same namespace index even from different log 049 protected static final Set<LogPartitionGroup> tailersId = Collections.newSetFromMap( 050 new ConcurrentHashMap<LogPartitionGroup, Boolean>()); 051 052 private static final Log log = LogFactory.getLog(ChronicleLogTailer.class); 053 054 protected final String basePath; 055 056 protected final ExcerptTailer cqTailer; 057 058 protected final ChronicleLogOffsetTracker offsetTracker; 059 060 protected final LogPartitionGroup id; 061 062 protected final LogPartition partition; 063 064 protected volatile boolean closed = false; 065 066 public ChronicleLogTailer(String basePath, ExcerptTailer cqTailer, LogPartition partition, String group, 067 ChronicleRetentionDuration retention) { 068 Objects.requireNonNull(group); 069 this.basePath = basePath; 070 this.cqTailer = cqTailer; 071 this.partition = partition; 072 this.id = new LogPartitionGroup(group, partition.name(), partition.partition()); 073 registerTailer(); 074 this.offsetTracker = new ChronicleLogOffsetTracker(basePath, partition.partition(), group, retention); 075 toLastCommitted(); 076 } 077 078 protected void registerTailer() { 079 if (!tailersId.add(id)) { 080 throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id); 081 } 082 } 083 084 protected void unregisterTailer() { 085 tailersId.remove(id); 086 } 087 088 @Override 089 public LogRecord<M> read(Duration timeout) throws InterruptedException { 090 LogRecord<M> ret = read(); 091 if (ret != null) { 092 return ret; 093 } 094 long timeoutMs = timeout.toMillis(); 095 long deadline = System.currentTimeMillis() + timeoutMs; 096 long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 097 while (ret == null && System.currentTimeMillis() < deadline) { 098 Thread.sleep(delay); 099 ret = read(); 100 } 101 return ret; 102 } 103 104 @SuppressWarnings("unchecked") 105 protected LogRecord<M> read() { 106 if (closed) { 107 throw new IllegalStateException("The tailer has been closed."); 108 } 109 List<M> value = new ArrayList<>(1); 110 long offset = cqTailer.index(); 111 if (!cqTailer.readDocument(w -> value.add((M) w.read("msg").object()))) { 112 return null; 113 } 114 return new LogRecord<>(value.get(0), new LogOffsetImpl(partition, offset)); 115 } 116 117 @Override 118 public LogOffset commit(LogPartition partition) { 119 if (!this.partition.equals(partition)) { 120 throw new IllegalArgumentException("Cannot commit this partition: " + partition + " from " + id); 121 } 122 long offset = cqTailer.index(); 123 offsetTracker.commit(offset); 124 if (log.isTraceEnabled()) { 125 log.trace(String.format("Commit %s:+%d", id, offset)); 126 } 127 return new LogOffsetImpl(partition, offset); 128 } 129 130 @Override 131 public void commit() { 132 commit(partition); 133 } 134 135 @Override 136 public void toEnd() { 137 log.debug(String.format("toEnd: %s", id)); 138 cqTailer.toEnd(); 139 } 140 141 @Override 142 public void toStart() { 143 log.debug(String.format("toStart: %s", id)); 144 cqTailer.toStart(); 145 } 146 147 @Override 148 public void toLastCommitted() { 149 long offset = offsetTracker.getLastCommittedOffset(); 150 if (offset > 0) { 151 log.debug(String.format("toLastCommitted: %s, found: %d", id, offset)); 152 cqTailer.moveToIndex(offset); 153 } else { 154 log.debug(String.format("toLastCommitted: %s, not found, move toStart", id)); 155 cqTailer.toStart(); 156 } 157 } 158 159 @Override 160 public void seek(LogOffset offset) { 161 if (!this.partition.equals(offset.partition())) { 162 throw new IllegalStateException( 163 "Cannot seek, tailer " + this + " has no assignment for partition: " + offset); 164 } 165 log.debug("Seek to " + offset + " from tailer: " + this); 166 cqTailer.moveToIndex(offset.offset()); 167 } 168 169 @Override 170 public void reset() { 171 reset(new LogPartition(id.name, id.partition)); 172 } 173 174 @Override 175 public void reset(LogPartition partition) { 176 if (!this.partition.equals(partition)) { 177 throw new IllegalArgumentException("Cannot reset this partition: " + partition + " from " + id); 178 } 179 log.info("Reset offset for partition: " + partition + " from tailer: " + this); 180 cqTailer.toStart(); 181 commit(partition); 182 } 183 184 @Override 185 public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) { 186 throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp"); 187 } 188 189 @Override 190 public Collection<LogPartition> assignments() { 191 return Collections.singletonList(new LogPartition(id.name, id.partition)); 192 } 193 194 @Override 195 public String group() { 196 return id.group; 197 } 198 199 @Override 200 public void close() { 201 if (!closed) { 202 log.debug("Closing: " + toString()); 203 offsetTracker.close(); 204 unregisterTailer(); 205 closed = true; 206 } 207 } 208 209 @Override 210 public boolean closed() { 211 return closed; 212 } 213 214 @Override 215 public String toString() { 216 return "ChronicleLogTailer{" + "basePath='" + basePath + '\'' + ", id=" + id + ", closed=" + closed + '}'; 217 } 218 219}