001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary;
022
023import java.io.File;
024import java.io.IOException;
025import java.nio.file.Files;
026import java.nio.file.Path;
027import java.util.stream.Stream;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.lib.stream.log.Name;
032
033import net.openhft.chronicle.queue.ExcerptAppender;
034import net.openhft.chronicle.queue.ExcerptTailer;
035import net.openhft.chronicle.queue.TailerDirection;
036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
037import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
038
039/**
040 * Track committed offset for a Log.
041 *
042 * @since 9.3
043 */
044public class ChronicleLogOffsetTracker implements AutoCloseable {
045    private static final Log log = LogFactory.getLog(ChronicleLogOffsetTracker.class);
046
047    protected static final String OFFSET_QUEUE_PREFIX = "offset-";
048
049    // message are small, minimum block size of 256K is buggy so take the size above
050    // this will create a cq4 file of 1.3MB max message size is around 256KB
051    public static final int CQ_BLOCK_SIZE = 1_048_576;
052
053    protected final SingleChronicleQueue offsetQueue;
054
055    protected final int partition;
056
057    protected long lastCommittedOffset;
058
059    protected final ChronicleRetentionDuration retention;
060
061    public ChronicleLogOffsetTracker(String basePath, int partition, Name group,
062            ChronicleRetentionDuration retention) {
063        this.partition = partition;
064        this.retention = retention;
065        File offsetFile = new File(basePath, OFFSET_QUEUE_PREFIX + group.getId());
066        ChronicleRetentionListener listener = null;
067        SingleChronicleQueueBuilder builder = binary(offsetFile).rollCycle(retention.getRollCycle())
068                                                                .blockSize(CQ_BLOCK_SIZE);
069        if (!retention.disable() && partition == 0) {
070            // offset queue is shared among partitions
071            // only the first partition handle the retention
072            listener = new ChronicleRetentionListener(retention);
073            builder.storeFileListener(listener);
074
075        }
076        offsetQueue = builder.build();
077        if (listener != null) {
078            listener.setQueue(offsetQueue);
079        }
080    }
081
082    public static boolean exists(Path basePath, Name group) {
083        try (Stream<Path> paths = Files.list(basePath.resolve(OFFSET_QUEUE_PREFIX + group.getId()))) {
084            return paths.count() > 0;
085        } catch (IOException e) {
086            return false;
087        }
088    }
089
090    public static boolean isOffsetTracker(String dirName) {
091        return dirName.startsWith(OFFSET_QUEUE_PREFIX);
092    }
093
094    public static String getGroupFromDirectory(String dirName) {
095        if (!isOffsetTracker(dirName)) {
096            throw new IllegalArgumentException(String.format("Invalid directory %s, not an offset tracker", dirName));
097        }
098        return dirName.replaceFirst(OFFSET_QUEUE_PREFIX, "");
099    }
100
101    /**
102     * Use a cache to return the last committed offset, concurrent consumer is not taken in account use
103     * {@link #readLastCommittedOffset()} in concurrency.
104     */
105    public long getLastCommittedOffset() {
106        if (lastCommittedOffset > 0) {
107            return lastCommittedOffset;
108        }
109        return readLastCommittedOffset();
110    }
111
112    /**
113     * Read the last committed offset from the file.
114     */
115    public long readLastCommittedOffset() {
116        try {
117            return doReadLastCommittedOffset();
118        } catch (IllegalStateException e) {
119            // sometime the end is NOT_REACHED, may be because the queue is not yet fully initialized
120            log.warn(String.format("Fail to reach the end of offset queue: %s because of: %s, retrying.",
121                    offsetQueue.file().getAbsolutePath(), e.getMessage()));
122            // try again
123            return doReadLastCommittedOffset();
124        }
125    }
126
127    protected long doReadLastCommittedOffset() {
128        try(ExcerptTailer offsetTailer = offsetQueue.createTailer().direction(TailerDirection.BACKWARD).toEnd()) {
129            switch (offsetTailer.state()) {
130                case FOUND_CYCLE:
131                    // expected case continue
132                    break;
133                case UNINITIALISED:
134                    // This is a new queue, we are not going to find anything
135                    return 0;
136                default:
137                    // border line cases that happens on unit tests and where the queue is not yet ready
138                    log.info("Invalid offset tailer state: " + offsetQueue.file().getAbsolutePath() + ": "
139                            + offsetTailer.state() + " taken as uninitialized");
140                    return 0;
141            }
142            final long[] offset = { 0 };
143            boolean hasNext;
144            do {
145                hasNext = offsetTailer.readBytes(b -> {
146                    int queue = b.readInt();
147                    long off = b.readLong();
148                    b.readLong(); // stamp not used
149                    if (partition == queue) {
150                        offset[0] = off;
151                    }
152                });
153            } while (offset[0] == 0 && hasNext);
154            return offset[0];
155        }
156    }
157
158    public void commit(long offset) {
159        ExcerptAppender appender = offsetQueue.acquireAppender();
160        appender.writeBytes(b -> b.writeInt(partition).writeLong(offset).writeLong(System.currentTimeMillis()));
161        lastCommittedOffset = offset;
162    }
163
164    @Override
165    public void close() {
166        if (!offsetQueue.isClosed()) {
167            offsetQueue.close();
168        }
169    }
170
171}