001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary; 022 023import java.io.File; 024import java.io.IOException; 025import java.nio.file.Files; 026import java.nio.file.Path; 027import java.util.stream.Stream; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import net.openhft.chronicle.queue.ExcerptAppender; 033import net.openhft.chronicle.queue.ExcerptTailer; 034import net.openhft.chronicle.queue.TailerDirection; 035import net.openhft.chronicle.queue.TailerState; 036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 037import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 038 039/** 040 * Track committed offset for a Log. 041 * 042 * @since 9.3 043 */ 044public class ChronicleLogOffsetTracker implements AutoCloseable { 045 private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class); 046 047 protected static final String OFFSET_QUEUE_PREFIX = "offset-"; 048 049 // message are small, minimum block size of 256K is buggy so take the size above 050 // this will create a cq4 file of 1.3MB max message size is around 256KB 051 public static final int CQ_BLOCK_SIZE = 1_048_576; 052 053 protected final SingleChronicleQueue offsetQueue; 054 055 protected final int partition; 056 057 protected long lastCommittedOffset; 058 059 protected final ChronicleRetentionDuration retention; 060 061 public ChronicleLogOffsetTracker(String basePath, int partition, String group, 062 ChronicleRetentionDuration retention) { 063 this.partition = partition; 064 this.retention = retention; 065 File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group); 066 ChronicleRetentionListener listener = null; 067 SingleChronicleQueueBuilder builder = binary(offsetFile).rollCycle(retention.getRollCycle()) 068 .blockSize(CQ_BLOCK_SIZE); 069 if (!retention.disable() && partition == 0) { 070 // offset queue is shared among partitions 071 // only the first partition handle the retention 072 listener = new ChronicleRetentionListener(retention); 073 builder.storeFileListener(listener); 074 075 } 076 offsetQueue = builder.build(); 077 if (listener != null) { 078 listener.setQueue(offsetQueue); 079 } 080 } 081 082 public static boolean exists(Path basePath, String group) { 083 try (Stream<Path> paths = Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group))) { 084 return paths.count() > 0; 085 } catch (IOException e) { 086 return false; 087 } 088 } 089 090 public static boolean isOffsetTracker(String dirName) { 091 return dirName.startsWith(OFFSET_QUEUE_PREFIX); 092 } 093 094 public static String getGroupFromDirectory(String dirName) { 095 if (!isOffsetTracker(dirName)) { 096 throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName)); 097 } 098 return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, ""); 099 } 100 101 /** 102 * Use a cache to return the last committed offset, concurrent consumer is not taken in account use 103 * {@link #readLastCommittedOffset()} in concurrency. 104 */ 105 public long getLastCommittedOffset() { 106 if (lastCommittedOffset > 0) { 107 return lastCommittedOffset; 108 } 109 return readLastCommittedOffset(); 110 } 111 112 /** 113 * Read the last committed offset from the file. 114 */ 115 public long readLastCommittedOffset() { 116 ExcerptTailer offsetTailer; 117 try { 118 offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd(); 119 } catch (IllegalStateException e) { 120 // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized 121 log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.", 122 offsetQueue.file().getAbsolutePath(), e.getMessage())); 123 offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd(); 124 } 125 if (offsetTailer.state() == TailerState.UNINITIALISED) { 126 // This is a new queue, we are not going to find anything 127 return 0; 128 } 129 final long[] offset = { 0 }; 130 boolean hasNext; 131 do { 132 hasNext = offsetTailer.readBytes(b -> { 133 int queue = b.readInt(); 134 long off = b.readLong(); 135 b.readLong(); // stamp not used 136 if (partition == queue) { 137 offset[0] = off; 138 } 139 }); 140 } while (offset[0] == 0 && hasNext); 141 return offset[0]; 142 } 143 144 public void commit(long offset) { 145 ExcerptAppender appender = offsetQueue.acquireAppender(); 146 appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis())); 147 lastCommittedOffset = offset; 148 } 149 150 @Override 151 public void close() { 152 if (!offsetQueue.isClosed()) { 153 offsetQueue.close(); 154 } 155 } 156 157}