001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary; 022 023import java.io.File; 024import java.io.IOException; 025import java.nio.file.Files; 026import java.nio.file.Path; 027import java.util.stream.Stream; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import net.openhft.chronicle.queue.ExcerptAppender; 033import net.openhft.chronicle.queue.ExcerptTailer; 034import net.openhft.chronicle.queue.TailerDirection; 035import net.openhft.chronicle.queue.TailerState; 036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 037import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 038 039/** 040 * Track committed offset for a Log. 041 * 042 * @since 9.3 043 */ 044public class ChronicleLogOffsetTracker implements AutoCloseable { 045 private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class); 046 047 protected static final String OFFSET_QUEUE_PREFIX = "offset-"; 048 049 protected final SingleChronicleQueue offsetQueue; 050 051 protected final int partition; 052 053 protected long lastCommittedOffset; 054 055 protected final ChronicleRetentionDuration retention; 056 057 public ChronicleLogOffsetTracker(String basePath, int partition, String group, 058 ChronicleRetentionDuration retention) { 059 this.partition = partition; 060 this.retention = retention; 061 File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group); 062 SingleChronicleQueueBuilder builder = binary(offsetFile); 063 ChronicleRetentionListener listener = null; 064 if (!retention.disable()) { 065 builder.rollCycle(retention.getRollCycle()); 066 if (partition == 0) { 067 // offset queue is shared among partitions 068 // only the first partition handle the retention 069 listener = new ChronicleRetentionListener(retention); 070 builder.storeFileListener(listener); 071 } 072 } 073 offsetQueue = builder.build(); 074 if (listener != null) { 075 listener.setQueue(offsetQueue); 076 } 077 } 078 079 public ChronicleLogOffsetTracker(String basePath, int partition, String group) { 080 this(basePath, partition, group, ChronicleRetentionDuration.DISABLE); 081 } 082 083 public static boolean exists(Path basePath, String group) { 084 try (Stream<Path> paths = Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group))) { 085 return paths.count() > 0; 086 } catch (IOException e) { 087 return false; 088 } 089 } 090 091 public static boolean isOffsetTracker(String dirName) { 092 return dirName.startsWith(OFFSET_QUEUE_PREFIX); 093 } 094 095 public static String getGroupFromDirectory(String dirName) { 096 if (!isOffsetTracker(dirName)) { 097 throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName)); 098 } 099 return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, ""); 100 } 101 102 /** 103 * Use a cache to return the last committed offset, concurrent consumer is not taken in account use 104 * {@link #readLastCommittedOffset()} in concurrency. 105 */ 106 public long getLastCommittedOffset() { 107 if (lastCommittedOffset > 0) { 108 return lastCommittedOffset; 109 } 110 return readLastCommittedOffset(); 111 } 112 113 /** 114 * Read the last committed offset from the file. 115 */ 116 public long readLastCommittedOffset() { 117 ExcerptTailer offsetTailer; 118 try { 119 offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd(); 120 } catch (IllegalStateException e) { 121 // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized 122 log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.", 123 offsetQueue.file().getAbsolutePath(), e.getMessage())); 124 offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd(); 125 } 126 if (offsetTailer.state() == TailerState.UNINITIALISED) { 127 // This is a new queue, we are not going to find anything 128 return 0; 129 } 130 final long[] offset = { 0 }; 131 boolean hasNext; 132 do { 133 hasNext = offsetTailer.readBytes(b -> { 134 int queue = b.readInt(); 135 long off = b.readLong(); 136 b.readLong(); // stamp not used 137 if (partition == queue) { 138 offset[0] = off; 139 } 140 }); 141 } while (offset[0] == 0 && hasNext); 142 return offset[0]; 143 } 144 145 public void commit(long offset) { 146 ExcerptAppender appender = offsetQueue.acquireAppender(); 147 appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis())); 148 lastCommittedOffset = offset; 149 } 150 151 @Override 152 public void close() { 153 if (!offsetQueue.isClosed()) { 154 offsetQueue.close(); 155 } 156 } 157 158}