001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary; 022 023import java.io.File; 024import java.io.IOException; 025import java.nio.file.Files; 026import java.nio.file.Path; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030 031import net.openhft.chronicle.queue.ExcerptAppender; 032import net.openhft.chronicle.queue.ExcerptTailer; 033import net.openhft.chronicle.queue.TailerDirection; 034import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 035import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 036 037/** 038 * Track committed offset for a Log. 039 * 040 * @since 9.3 041 */ 042public class ChronicleLogOffsetTracker implements AutoCloseable { 043 private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class); 044 045 protected static final String OFFSET_QUEUE_PREFIX = "offset-"; 046 047 protected final SingleChronicleQueue offsetQueue; 048 049 protected final int partition; 050 051 protected long lastCommittedOffset; 052 053 protected final ChronicleRetentionDuration retention; 054 055 public ChronicleLogOffsetTracker(String basePath, int partition, String group, 056 ChronicleRetentionDuration retention) { 057 this.partition = partition; 058 this.retention = retention; 059 File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group); 060 SingleChronicleQueueBuilder builder = binary(offsetFile); 061 ChronicleRetentionListener listener = null; 062 if (!retention.disable()) { 063 builder.rollCycle(retention.getRollCycle()); 064 if (partition == 0) { 065 // offset queue is shared among partitions 066 // only the first partition handle the retention 067 listener = new ChronicleRetentionListener(retention); 068 builder.storeFileListener(listener); 069 } 070 } 071 offsetQueue = builder.build(); 072 if (listener != null) { 073 listener.setQueue(offsetQueue); 074 } 075 } 076 077 public ChronicleLogOffsetTracker(String basePath, int partition, String group) { 078 this(basePath, partition, group, ChronicleRetentionDuration.DISABLE); 079 } 080 081 public static boolean exists(Path basePath, String group) { 082 try { 083 return Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group)).count() > 0; 084 } catch (IOException e) { 085 return false; 086 } 087 } 088 089 public static boolean isOffsetTracker(String dirName) { 090 return dirName.startsWith(OFFSET_QUEUE_PREFIX); 091 } 092 093 public static String getGroupFromDirectory(String dirName) { 094 if (!isOffsetTracker(dirName)) { 095 throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName)); 096 } 097 return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, ""); 098 } 099 100 /** 101 * Use a cache to return the last committed offset, concurrent consumer is not taken in account use 102 * {@link #readLastCommittedOffset()} in concurrency. 103 */ 104 public long getLastCommittedOffset() { 105 if (lastCommittedOffset > 0) { 106 return lastCommittedOffset; 107 } 108 return readLastCommittedOffset(); 109 } 110 111 /** 112 * Read the last committed offset from the file. 113 */ 114 public long readLastCommittedOffset() { 115 ExcerptTailer offsetTailer; 116 try { 117 offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd(); 118 } catch (IllegalStateException e) { 119 // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized 120 log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.", 121 offsetQueue.file().getAbsolutePath(), e.getMessage())); 122 offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd(); 123 } 124 final long[] offset = { 0 }; 125 boolean hasNext; 126 do { 127 hasNext = offsetTailer.readBytes(b -> { 128 int queue = b.readInt(); 129 long off = b.readLong(); 130 b.readLong(); // stamp not used 131 if (partition == queue) { 132 offset[0] = off; 133 } 134 }); 135 } while (offset[0] == 0 && hasNext); 136 // System.out.println("last committed returned from: " + offsetQueue.file() + " " + offset[0] + " hasNext: " + 137 // hasNext); 138 return offset[0]; 139 } 140 141 public void commit(long offset) { 142 ExcerptAppender appender = offsetQueue.acquireAppender(); 143 appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis())); 144 // System.out.println(String.format("COMMIT %s, partition: %s, offset: %s, pos: %s", 145 // offsetQueue.file(), partition, offset, appender.lastIndexAppended())); 146 lastCommittedOffset = offset; 147 } 148 149 @Override 150 public void close() { 151 offsetQueue.close(); 152 } 153 154}