001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary;
022
023import java.io.File;
024import java.io.IOException;
025import java.nio.file.Files;
026import java.nio.file.Path;
027import java.util.stream.Stream;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import net.openhft.chronicle.queue.ExcerptAppender;
033import net.openhft.chronicle.queue.ExcerptTailer;
034import net.openhft.chronicle.queue.TailerDirection;
035import net.openhft.chronicle.queue.TailerState;
036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
037import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
038
039/**
040 * Track committed offset for a Log.
041 *
042 * @since 9.3
043 */
044public class ChronicleLogOffsetTracker implements AutoCloseable {
045    private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class);
046
047    protected static final String OFFSET_QUEUE_PREFIX = "offset-";
048
049    // message are small, minimum block size of 256K is buggy so take the size above
050    // this will create a cq4 file of 1.3MB max message size is around 256KB
051    public static final int CQ_BLOCK_SIZE = 1_048_576;
052
053    protected final SingleChronicleQueue offsetQueue;
054
055    protected final int partition;
056
057    protected long lastCommittedOffset;
058
059    protected final ChronicleRetentionDuration retention;
060
061    public ChronicleLogOffsetTracker(String basePath, int partition, String group,
062            ChronicleRetentionDuration retention) {
063        this.partition = partition;
064        this.retention = retention;
065        File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group);
066        ChronicleRetentionListener listener = null;
067        SingleChronicleQueueBuilder builder = binary(offsetFile).rollCycle(retention.getRollCycle())
068                                                                .blockSize(CQ_BLOCK_SIZE);
069        if (!retention.disable() && partition == 0) {
070            // offset queue is shared among partitions
071            // only the first partition handle the retention
072            listener = new ChronicleRetentionListener(retention);
073            builder.storeFileListener(listener);
074
075        }
076        offsetQueue = builder.build();
077        if (listener != null) {
078            listener.setQueue(offsetQueue);
079        }
080    }
081
082    public static boolean exists(Path basePath, String group) {
083        try (Stream<Path> paths = Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group))) {
084            return paths.count() > 0;
085        } catch (IOException e) {
086            return false;
087        }
088    }
089
090    public static boolean isOffsetTracker(String dirName) {
091        return dirName.startsWith(OFFSET_QUEUE_PREFIX);
092    }
093
094    public static String getGroupFromDirectory(String dirName) {
095        if (!isOffsetTracker(dirName)) {
096            throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName));
097        }
098        return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, "");
099    }
100
101    /**
102     * Use a cache to return the last committed offset, concurrent consumer is not taken in account use
103     * {@link #readLastCommittedOffset()} in concurrency.
104     */
105    public long getLastCommittedOffset() {
106        if (lastCommittedOffset > 0) {
107            return lastCommittedOffset;
108        }
109        return readLastCommittedOffset();
110    }
111
112    /**
113     * Read the last committed offset from the file.
114     */
115    public long readLastCommittedOffset() {
116        ExcerptTailer offsetTailer;
117        try {
118            offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd();
119        } catch (IllegalStateException e) {
120            // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized
121            log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.",
122                    offsetQueue.file().getAbsolutePath(), e.getMessage()));
123            offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd();
124        }
125        if (offsetTailer.state() == TailerState.UNINITIALISED) {
126            // This is a new queue, we are not going to find anything
127            return 0;
128        }
129        final long[] offset = { 0 };
130        boolean hasNext;
131        do {
132            hasNext = offsetTailer.readBytes(b -> {
133                int queue = b.readInt();
134                long off = b.readLong();
135                b.readLong(); // stamp not used
136                if (partition == queue) {
137                    offset[0] = off;
138                }
139            });
140        } while (offset[0] == 0 && hasNext);
141        return offset[0];
142    }
143
144    public void commit(long offset) {
145        ExcerptAppender appender = offsetQueue.acquireAppender();
146        appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis()));
147        lastCommittedOffset = offset;
148    }
149
150    @Override
151    public void close() {
152        if (!offsetQueue.isClosed()) {
153            offsetQueue.close();
154        }
155    }
156
157}