001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary;
022
023import java.io.File;
024import java.io.IOException;
025import java.nio.file.Files;
026import java.nio.file.Path;
027import java.util.stream.Stream;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import net.openhft.chronicle.queue.ExcerptAppender;
033import net.openhft.chronicle.queue.ExcerptTailer;
034import net.openhft.chronicle.queue.TailerDirection;
035import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
037
038/**
039 * Track committed offset for a Log.
040 *
041 * @since 9.3
042 */
043public class ChronicleLogOffsetTracker implements AutoCloseable {
044    private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class);
045
046    protected static final String OFFSET_QUEUE_PREFIX = "offset-";
047
048    protected final SingleChronicleQueue offsetQueue;
049
050    protected final int partition;
051
052    protected long lastCommittedOffset;
053
054    protected final ChronicleRetentionDuration retention;
055
056    public ChronicleLogOffsetTracker(String basePath, int partition, String group,
057            ChronicleRetentionDuration retention) {
058        this.partition = partition;
059        this.retention = retention;
060        File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group);
061        SingleChronicleQueueBuilder builder = binary(offsetFile);
062        ChronicleRetentionListener listener = null;
063        if (!retention.disable()) {
064            builder.rollCycle(retention.getRollCycle());
065            if (partition == 0) {
066                // offset queue is shared among partitions
067                // only the first partition handle the retention
068                listener = new ChronicleRetentionListener(retention);
069                builder.storeFileListener(listener);
070            }
071        }
072        offsetQueue = builder.build();
073        if (listener != null) {
074            listener.setQueue(offsetQueue);
075        }
076    }
077
078    public ChronicleLogOffsetTracker(String basePath, int partition, String group) {
079        this(basePath, partition, group, ChronicleRetentionDuration.DISABLE);
080    }
081
082    public static boolean exists(Path basePath, String group) {
083        try (Stream<Path> paths = Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group))) {
084            return paths.count() > 0;
085        } catch (IOException e) {
086            return false;
087        }
088    }
089
090    public static boolean isOffsetTracker(String dirName) {
091        return dirName.startsWith(OFFSET_QUEUE_PREFIX);
092    }
093
094    public static String getGroupFromDirectory(String dirName) {
095        if (!isOffsetTracker(dirName)) {
096            throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName));
097        }
098        return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, "");
099    }
100
101    /**
102     * Use a cache to return the last committed offset, concurrent consumer is not taken in account use
103     * {@link #readLastCommittedOffset()} in concurrency.
104     */
105    public long getLastCommittedOffset() {
106        if (lastCommittedOffset > 0) {
107            return lastCommittedOffset;
108        }
109        return readLastCommittedOffset();
110    }
111
112    /**
113     * Read the last committed offset from the file.
114     */
115    public long readLastCommittedOffset() {
116        ExcerptTailer offsetTailer;
117        try {
118            offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd();
119        } catch (IllegalStateException e) {
120            // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized
121            log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.",
122                    offsetQueue.file().getAbsolutePath(), e.getMessage()));
123            offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd();
124        }
125        final long[] offset = { 0 };
126        boolean hasNext;
127        do {
128            hasNext = offsetTailer.readBytes(b -> {
129                int queue = b.readInt();
130                long off = b.readLong();
131                b.readLong(); // stamp not used
132                if (partition == queue) {
133                    offset[0] = off;
134                }
135            });
136        } while (offset[0] == 0 && hasNext);
137        // System.out.println("last committed returned from: " + offsetQueue.file() + " " + offset[0] + " hasNext: " +
138        // hasNext);
139        return offset[0];
140    }
141
142    public void commit(long offset) {
143        ExcerptAppender appender = offsetQueue.acquireAppender();
144        appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis()));
145        // System.out.println(String.format("COMMIT %s, partition: %s, offset: %s, pos: %s",
146        // offsetQueue.file(), partition, offset, appender.lastIndexAppended()));
147        lastCommittedOffset = offset;
148    }
149
150    @Override
151    public void close() {
152        offsetQueue.close();
153    }
154
155}