001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle;
020
021import net.openhft.chronicle.queue.ExcerptTailer;
022import net.openhft.chronicle.queue.TailerDirection;
023import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
024
025import java.io.File;
026
027import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary;
028
029/**
030 * Track committed offset for a queue.
031 *
032 * @since 9.1
033 */
034public class ChronicleMQOffsetTracker implements AutoCloseable {
035    private final SingleChronicleQueue offsetQueue;
036    private final int queueIndex;
037    private static final String OFFSET_QUEUE_PREFIX = "offset-";
038    private long lastCommittedOffset;
039
040    public ChronicleMQOffsetTracker(String basePath, int queue, String group) {
041        queueIndex = queue;
042        File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group);
043        offsetQueue = binary(offsetFile).build();
044    }
045
046    /**
047     * Use a cache to return the last committed offset, concurrent consumer is not taken in account
048     * use {@link #readLastCommittedOffset()} in concurrency.
049     */
050    public long getLastCommittedOffset() {
051        if (lastCommittedOffset > 0) {
052            return lastCommittedOffset;
053        }
054        return readLastCommittedOffset();
055    }
056
057    /**
058     * Read the last committed offset from the file.
059     */
060    public long readLastCommittedOffset() {
061        ExcerptTailer offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd();
062        final long[] offset = {0};
063        boolean hasNext;
064        do {
065            hasNext = offsetTailer.readBytes(b -> {
066                int queue = b.readInt();
067                long off = b.readLong();
068                long stamp = b.readLong();
069                if (queueIndex == queue) {
070                    offset[0] = off;
071                }
072            });
073        } while (offset[0] == 0 && hasNext);
074        return offset[0];
075    }
076
077    public void commit(long offset) {
078        offsetQueue.acquireAppender().writeBytes(b -> b.writeInt(queueIndex).writeLong(offset).writeLong(System.currentTimeMillis()));
079        lastCommittedOffset = offset;
080    }
081
082    @Override
083    public void close() {
084        offsetQueue.close();
085    }
086
087}