001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary; 022 023import java.io.File; 024import java.io.IOException; 025import java.nio.file.Files; 026import java.nio.file.Path; 027import java.util.stream.Stream; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import net.openhft.chronicle.queue.ExcerptAppender; 033import net.openhft.chronicle.queue.ExcerptTailer; 034import net.openhft.chronicle.queue.TailerDirection; 035import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 037 038/** 039 * Track committed offset for a Log. 040 * 041 * @since 9.3 042 */ 043public class ChronicleLogOffsetTracker implements AutoCloseable { 044 private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class); 045 046 protected static final String OFFSET_QUEUE_PREFIX = "offset-"; 047 048 protected final SingleChronicleQueue offsetQueue; 049 050 protected final int partition; 051 052 protected long lastCommittedOffset; 053 054 protected final ChronicleRetentionDuration retention; 055 056 public ChronicleLogOffsetTracker(String basePath, int partition, String group, 057 ChronicleRetentionDuration retention) { 058 this.partition = partition; 059 this.retention = retention; 060 File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group); 061 SingleChronicleQueueBuilder builder = binary(offsetFile); 062 ChronicleRetentionListener listener = null; 063 if (!retention.disable()) { 064 builder.rollCycle(retention.getRollCycle()); 065 if (partition == 0) { 066 // offset queue is shared among partitions 067 // only the first partition handle the retention 068 listener = new ChronicleRetentionListener(retention); 069 builder.storeFileListener(listener); 070 } 071 } 072 offsetQueue = builder.build(); 073 if (listener != null) { 074 listener.setQueue(offsetQueue); 075 } 076 } 077 078 public ChronicleLogOffsetTracker(String basePath, int partition, String group) { 079 this(basePath, partition, group, ChronicleRetentionDuration.DISABLE); 080 } 081 082 public static boolean exists(Path basePath, String group) { 083 try (Stream<Path> paths = Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group))) { 084 return paths.count() > 0; 085 } catch (IOException e) { 086 return false; 087 } 088 } 089 090 public static boolean isOffsetTracker(String dirName) { 091 return dirName.startsWith(OFFSET_QUEUE_PREFIX); 092 } 093 094 public static String getGroupFromDirectory(String dirName) { 095 if (!isOffsetTracker(dirName)) { 096 throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName)); 097 } 098 return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, ""); 099 } 100 101 /** 102 * Use a cache to return the last committed offset, concurrent consumer is not taken in account use 103 * {@link #readLastCommittedOffset()} in concurrency. 104 */ 105 public long getLastCommittedOffset() { 106 if (lastCommittedOffset > 0) { 107 return lastCommittedOffset; 108 } 109 return readLastCommittedOffset(); 110 } 111 112 /** 113 * Read the last committed offset from the file. 114 */ 115 public long readLastCommittedOffset() { 116 ExcerptTailer offsetTailer; 117 try { 118 offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd(); 119 } catch (IllegalStateException e) { 120 // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized 121 log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.", 122 offsetQueue.file().getAbsolutePath(), e.getMessage())); 123 offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd(); 124 } 125 final long[] offset = { 0 }; 126 boolean hasNext; 127 do { 128 hasNext = offsetTailer.readBytes(b -> { 129 int queue = b.readInt(); 130 long off = b.readLong(); 131 b.readLong(); // stamp not used 132 if (partition == queue) { 133 offset[0] = off; 134 } 135 }); 136 } while (offset[0] == 0 && hasNext); 137 // System.out.println("last committed returned from: " + offsetQueue.file() + " " + offset[0] + " hasNext: " + 138 // hasNext); 139 return offset[0]; 140 } 141 142 public void commit(long offset) { 143 ExcerptAppender appender = offsetQueue.acquireAppender(); 144 appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis())); 145 // System.out.println(String.format("COMMIT %s, partition: %s, offset: %s, pos: %s", 146 // offsetQueue.file(), partition, offset, appender.lastIndexAppended())); 147 lastCommittedOffset = offset; 148 } 149 150 @Override 151 public void close() { 152 offsetQueue.close(); 153 } 154 155}