001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary;
022
023import java.io.File;
024import java.io.IOException;
025import java.nio.file.Files;
026import java.nio.file.Path;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030
031import net.openhft.chronicle.queue.ExcerptAppender;
032import net.openhft.chronicle.queue.ExcerptTailer;
033import net.openhft.chronicle.queue.TailerDirection;
034import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
035import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
036
037/**
038 * Track committed offset for a Log.
039 *
040 * @since 9.3
041 */
042public class ChronicleLogOffsetTracker implements AutoCloseable {
043    private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class);
044
045    protected static final String OFFSET_QUEUE_PREFIX = "offset-";
046
047    protected final SingleChronicleQueue offsetQueue;
048
049    protected final int partition;
050
051    protected long lastCommittedOffset;
052
053    protected final ChronicleRetentionDuration retention;
054
055    public ChronicleLogOffsetTracker(String basePath, int partition, String group,
056            ChronicleRetentionDuration retention) {
057        this.partition = partition;
058        this.retention = retention;
059        File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group);
060        SingleChronicleQueueBuilder builder = binary(offsetFile);
061        ChronicleRetentionListener listener = null;
062        if (!retention.disable()) {
063            builder.rollCycle(retention.getRollCycle());
064            if (partition == 0) {
065                // offset queue is shared among partitions
066                // only the first partition handle the retention
067                listener = new ChronicleRetentionListener(retention);
068                builder.storeFileListener(listener);
069            }
070        }
071        offsetQueue = builder.build();
072        if (listener != null) {
073            listener.setQueue(offsetQueue);
074        }
075    }
076
077    public ChronicleLogOffsetTracker(String basePath, int partition, String group) {
078        this(basePath, partition, group, ChronicleRetentionDuration.DISABLE);
079    }
080
081    public static boolean exists(Path basePath, String group) {
082        try {
083            return Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group)).count() > 0;
084        } catch (IOException e) {
085            return false;
086        }
087    }
088
089    public static boolean isOffsetTracker(String dirName) {
090        return dirName.startsWith(OFFSET_QUEUE_PREFIX);
091    }
092
093    public static String getGroupFromDirectory(String dirName) {
094        if (!isOffsetTracker(dirName)) {
095            throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName));
096        }
097        return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, "");
098    }
099
100    /**
101     * Use a cache to return the last committed offset, concurrent consumer is not taken in account use
102     * {@link #readLastCommittedOffset()} in concurrency.
103     */
104    public long getLastCommittedOffset() {
105        if (lastCommittedOffset > 0) {
106            return lastCommittedOffset;
107        }
108        return readLastCommittedOffset();
109    }
110
111    /**
112     * Read the last committed offset from the file.
113     */
114    public long readLastCommittedOffset() {
115        ExcerptTailer offsetTailer;
116        try {
117            offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd();
118        } catch (IllegalStateException e) {
119            // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized
120            log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.",
121                    offsetQueue.file().getAbsolutePath(), e.getMessage()));
122            offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd();
123        }
124        final long[] offset = { 0 };
125        boolean hasNext;
126        do {
127            hasNext = offsetTailer.readBytes(b -> {
128                int queue = b.readInt();
129                long off = b.readLong();
130                b.readLong(); // stamp not used
131                if (partition == queue) {
132                    offset[0] = off;
133                }
134            });
135        } while (offset[0] == 0 && hasNext);
136        // System.out.println("last committed returned from: " + offsetQueue.file() + " " + offset[0] + " hasNext: " +
137        // hasNext);
138        return offset[0];
139    }
140
141    public void commit(long offset) {
142        ExcerptAppender appender = offsetQueue.acquireAppender();
143        appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis()));
144        // System.out.println(String.format("COMMIT %s, partition: %s, offset: %s, pos: %s",
145        // offsetQueue.file(), partition, offset, appender.lastIndexAppended()));
146        lastCommittedOffset = offset;
147    }
148
149    @Override
150    public void close() {
151        offsetQueue.close();
152    }
153
154}