001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary;
022
023import java.io.File;
024import java.io.IOException;
025import java.nio.file.Files;
026import java.nio.file.Path;
027import java.util.stream.Stream;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import net.openhft.chronicle.queue.ExcerptAppender;
033import net.openhft.chronicle.queue.ExcerptTailer;
034import net.openhft.chronicle.queue.TailerDirection;
035import net.openhft.chronicle.queue.TailerState;
036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
037import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
038
039/**
040 * Track committed offset for a Log.
041 *
042 * @since 9.3
043 */
044public class ChronicleLogOffsetTracker implements AutoCloseable {
045    private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class);
046
047    protected static final String OFFSET_QUEUE_PREFIX = "offset-";
048
049    protected final SingleChronicleQueue offsetQueue;
050
051    protected final int partition;
052
053    protected long lastCommittedOffset;
054
055    protected final ChronicleRetentionDuration retention;
056
057    public ChronicleLogOffsetTracker(String basePath, int partition, String group,
058            ChronicleRetentionDuration retention) {
059        this.partition = partition;
060        this.retention = retention;
061        File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group);
062        SingleChronicleQueueBuilder builder = binary(offsetFile);
063        ChronicleRetentionListener listener = null;
064        if (!retention.disable()) {
065            builder.rollCycle(retention.getRollCycle());
066            if (partition == 0) {
067                // offset queue is shared among partitions
068                // only the first partition handle the retention
069                listener = new ChronicleRetentionListener(retention);
070                builder.storeFileListener(listener);
071            }
072        }
073        offsetQueue = builder.build();
074        if (listener != null) {
075            listener.setQueue(offsetQueue);
076        }
077    }
078
079    public ChronicleLogOffsetTracker(String basePath, int partition, String group) {
080        this(basePath, partition, group, ChronicleRetentionDuration.DISABLE);
081    }
082
083    public static boolean exists(Path basePath, String group) {
084        try (Stream<Path> paths = Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group))) {
085            return paths.count() > 0;
086        } catch (IOException e) {
087            return false;
088        }
089    }
090
091    public static boolean isOffsetTracker(String dirName) {
092        return dirName.startsWith(OFFSET_QUEUE_PREFIX);
093    }
094
095    public static String getGroupFromDirectory(String dirName) {
096        if (!isOffsetTracker(dirName)) {
097            throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName));
098        }
099        return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, "");
100    }
101
102    /**
103     * Use a cache to return the last committed offset, concurrent consumer is not taken in account use
104     * {@link #readLastCommittedOffset()} in concurrency.
105     */
106    public long getLastCommittedOffset() {
107        if (lastCommittedOffset > 0) {
108            return lastCommittedOffset;
109        }
110        return readLastCommittedOffset();
111    }
112
113    /**
114     * Read the last committed offset from the file.
115     */
116    public long readLastCommittedOffset() {
117        ExcerptTailer offsetTailer;
118        try {
119            offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd();
120        } catch (IllegalStateException e) {
121            // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized
122            log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.",
123                    offsetQueue.file().getAbsolutePath(), e.getMessage()));
124            offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd();
125        }
126        if (offsetTailer.state() == TailerState.UNINITIALISED) {
127            // This is a new queue, we are not going to find anything
128            return 0;
129        }
130        final long[] offset = { 0 };
131        boolean hasNext;
132        do {
133            hasNext = offsetTailer.readBytes(b -> {
134                int queue = b.readInt();
135                long off = b.readLong();
136                b.readLong(); // stamp not used
137                if (partition == queue) {
138                    offset[0] = off;
139                }
140            });
141        } while (offset[0] == 0 && hasNext);
142        return offset[0];
143    }
144
145    public void commit(long offset) {
146        ExcerptAppender appender = offsetQueue.acquireAppender();
147        appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis()));
148        lastCommittedOffset = offset;
149    }
150
151    @Override
152    public void close() {
153        if (!offsetQueue.isClosed()) {
154            offsetQueue.close();
155        }
156    }
157
158}