001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle; 020 021import net.openhft.chronicle.queue.ExcerptTailer; 022import net.openhft.chronicle.queue.TailerDirection; 023import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 024 025import java.io.File; 026 027import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary; 028 029/** 030 * Track committed offset for a queue. 031 * 032 * @since 9.1 033 */ 034public class ChronicleMQOffsetTracker implements AutoCloseable { 035 private final SingleChronicleQueue offsetQueue; 036 private final int queueIndex; 037 private static final String OFFSET_QUEUE_PREFIX = "offset-"; 038 private long lastCommittedOffset; 039 040 public ChronicleMQOffsetTracker(String basePath, int queue, String group) { 041 queueIndex = queue; 042 File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group); 043 offsetQueue = binary(offsetFile).build(); 044 } 045 046 /** 047 * Use a cache to return the last committed offset, concurrent consumer is not taken in account 048 * use {@link #readLastCommittedOffset()} in concurrency. 049 */ 050 public long getLastCommittedOffset() { 051 if (lastCommittedOffset > 0) { 052 return lastCommittedOffset; 053 } 054 return readLastCommittedOffset(); 055 } 056 057 /** 058 * Read the last committed offset from the file. 059 */ 060 public long readLastCommittedOffset() { 061 ExcerptTailer offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd(); 062 final long[] offset = {0}; 063 boolean hasNext; 064 do { 065 hasNext = offsetTailer.readBytes(b -> { 066 int queue = b.readInt(); 067 long off = b.readLong(); 068 long stamp = b.readLong(); 069 if (queueIndex == queue) { 070 offset[0] = off; 071 } 072 }); 073 } while (offset[0] == 0 && hasNext); 074 return offset[0]; 075 } 076 077 public void commit(long offset) { 078 offsetQueue.acquireAppender().writeBytes(b -> b.writeInt(queueIndex).writeLong(offset).writeLong(System.currentTimeMillis())); 079 lastCommittedOffset = offset; 080 } 081 082 @Override 083 public void close() { 084 offsetQueue.close(); 085 } 086 087}