001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary; 022 023import java.io.File; 024import java.io.IOException; 025import java.nio.file.Files; 026import java.nio.file.Path; 027import java.util.stream.Stream; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.lib.stream.log.Name; 032 033import net.openhft.chronicle.queue.ExcerptAppender; 034import net.openhft.chronicle.queue.ExcerptTailer; 035import net.openhft.chronicle.queue.TailerDirection; 036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 037import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 038 039/** 040 * Track committed offset for a Log. 041 * 042 * @since 9.3 043 */ 044public class ChronicleLogOffsetTracker implements AutoCloseable { 045 private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class); 046 047 protected static final String OFFSET_QUEUE_PREFIX = "offset-"; 048 049 // message are small, minimum block size of 256K is buggy so take the size above 050 // this will create a cq4 file of 1.3MB max message size is around 256KB 051 public static final int CQ_BLOCK_SIZE = 1_048_576; 052 053 protected final SingleChronicleQueue offsetQueue; 054 055 protected final int partition; 056 057 protected long lastCommittedOffset; 058 059 protected final ChronicleRetentionDuration retention; 060 061 public ChronicleLogOffsetTracker(String basePath, int partition, Name group, 062 ChronicleRetentionDuration retention) { 063 this.partition = partition; 064 this.retention = retention; 065 File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group.getId()); 066 ChronicleRetentionListener listener = null; 067 SingleChronicleQueueBuilder builder = binary(offsetFile).rollCycle(retention.getRollCycle()) 068 .blockSize(CQ_BLOCK_SIZE); 069 if (!retention.disable() && partition == 0) { 070 // offset queue is shared among partitions 071 // only the first partition handle the retention 072 listener = new ChronicleRetentionListener(retention); 073 builder.storeFileListener(listener); 074 075 } 076 offsetQueue = builder.build(); 077 if (listener != null) { 078 listener.setQueue(offsetQueue); 079 } 080 } 081 082 public static boolean exists(Path basePath, Name group) { 083 try (Stream<Path> paths = Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group.getId()))) { 084 return paths.count() > 0; 085 } catch (IOException e) { 086 return false; 087 } 088 } 089 090 public static boolean isOffsetTracker(String dirName) { 091 return dirName.startsWith(OFFSET_QUEUE_PREFIX); 092 } 093 094 public static String getGroupFromDirectory(String dirName) { 095 if (!isOffsetTracker(dirName)) { 096 throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName)); 097 } 098 return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, ""); 099 } 100 101 /** 102 * Use a cache to return the last committed offset, concurrent consumer is not taken in account use 103 * {@link #readLastCommittedOffset()} in concurrency. 104 */ 105 public long getLastCommittedOffset() { 106 if (lastCommittedOffset > 0) { 107 return lastCommittedOffset; 108 } 109 return readLastCommittedOffset(); 110 } 111 112 /** 113 * Read the last committed offset from the file. 114 */ 115 public long readLastCommittedOffset() { 116 try { 117 return doReadLastCommittedOffset(); 118 } catch (IllegalStateException e) { 119 // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized 120 log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.", 121 offsetQueue.file().getAbsolutePath(), e.getMessage())); 122 // try again 123 return doReadLastCommittedOffset(); 124 } 125 } 126 127 protected long doReadLastCommittedOffset() { 128 try(ExcerptTailer offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd()) { 129 switch (offsetTailer.state()) { 130 case FOUND_CYCLE: 131 // expected case continue 132 break; 133 case UNINITIALISED: 134 // This is a new queue, we are not going to find anything 135 return 0; 136 default: 137 // border line cases that happens on unit tests and where the queue is not yet ready 138 log.info("Invalid offset tailer state: " + offsetQueue.file().getAbsolutePath() + ": " 139 + offsetTailer.state() + " taken as uninitialized"); 140 return 0; 141 } 142 final long[] offset = { 0 }; 143 boolean hasNext; 144 do { 145 hasNext = offsetTailer.readBytes(b -> { 146 int queue = b.readInt(); 147 long off = b.readLong(); 148 b.readLong(); // stamp not used 149 if (partition == queue) { 150 offset[0] = off; 151 } 152 }); 153 } while (offset[0] == 0 && hasNext); 154 return offset[0]; 155 } 156 } 157 158 public void commit(long offset) { 159 ExcerptAppender appender = offsetQueue.acquireAppender(); 160 appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis())); 161 lastCommittedOffset = offset; 162 } 163 164 @Override 165 public void close() { 166 if (!offsetQueue.isClosed()) { 167 offsetQueue.close(); 168 } 169 } 170 171}