001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import java.io.Externalizable;
022import java.time.Duration;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.List;
027import java.util.Objects;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.lib.stream.log.LogOffset;
034import org.nuxeo.lib.stream.log.LogPartition;
035import org.nuxeo.lib.stream.log.LogRecord;
036import org.nuxeo.lib.stream.log.LogTailer;
037import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
038import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
039
040import net.openhft.chronicle.queue.ExcerptTailer;
041
042/**
043 * @since 9.3
044 */
045public class ChronicleLogTailer<M extends Externalizable> implements LogTailer<M> {
046    protected static final long POLL_INTERVAL_MS = 100L;
047
048    // keep track of all tailers on the same namespace index even from different log
049    protected static final Set<LogPartitionGroup> tailersId = Collections.newSetFromMap(
050            new ConcurrentHashMap<LogPartitionGroup, Boolean>());
051
052    private static final Log log = LogFactory.getLog(ChronicleLogTailer.class);
053
054    protected final String basePath;
055
056    protected final ExcerptTailer cqTailer;
057
058    protected final ChronicleLogOffsetTracker offsetTracker;
059
060    protected final LogPartitionGroup id;
061
062    protected final LogPartition partition;
063
064    protected volatile boolean closed = false;
065
066    public ChronicleLogTailer(String basePath, ExcerptTailer cqTailer, LogPartition partition, String group,
067            ChronicleRetentionDuration retention) {
068        Objects.requireNonNull(group);
069        this.basePath = basePath;
070        this.cqTailer = cqTailer;
071        this.partition = partition;
072        this.id = new LogPartitionGroup(group, partition.name(), partition.partition());
073        registerTailer();
074        this.offsetTracker = new ChronicleLogOffsetTracker(basePath, partition.partition(), group, retention);
075        toLastCommitted();
076    }
077
078    protected void registerTailer() {
079        if (!tailersId.add(id)) {
080            throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id);
081        }
082    }
083
084    protected void unregisterTailer() {
085        tailersId.remove(id);
086    }
087
088    @Override
089    public LogRecord<M> read(Duration timeout) throws InterruptedException {
090        LogRecord<M> ret = read();
091        if (ret != null) {
092            return ret;
093        }
094        long timeoutMs = timeout.toMillis();
095        long deadline = System.currentTimeMillis() + timeoutMs;
096        long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
097        while (ret == null && System.currentTimeMillis() < deadline) {
098            Thread.sleep(delay);
099            ret = read();
100        }
101        return ret;
102    }
103
104    @SuppressWarnings("unchecked")
105    protected LogRecord<M> read() {
106        if (closed) {
107            throw new IllegalStateException("The tailer has been closed.");
108        }
109        List<M> value = new ArrayList<>(1);
110        long offset = cqTailer.index();
111        if (!cqTailer.readDocument(w -> value.add((M) w.read("msg").object()))) {
112            return null;
113        }
114        return new LogRecord<>(value.get(0), new LogOffsetImpl(partition, offset));
115    }
116
117    @Override
118    public LogOffset commit(LogPartition partition) {
119        if (!this.partition.equals(partition)) {
120            throw new IllegalArgumentException("Cannot commit this partition: " + partition + " from " + id);
121        }
122        long offset = cqTailer.index();
123        offsetTracker.commit(offset);
124        if (log.isTraceEnabled()) {
125            log.trace(String.format("Commit %s:+%d", id, offset));
126        }
127        return new LogOffsetImpl(partition, offset);
128    }
129
130    @Override
131    public void commit() {
132        commit(partition);
133    }
134
135    @Override
136    public void toEnd() {
137        log.debug(String.format("toEnd: %s", id));
138        cqTailer.toEnd();
139    }
140
141    @Override
142    public void toStart() {
143        log.debug(String.format("toStart: %s", id));
144        cqTailer.toStart();
145    }
146
147    @Override
148    public void toLastCommitted() {
149        long offset = offsetTracker.getLastCommittedOffset();
150        if (offset > 0) {
151            log.debug(String.format("toLastCommitted: %s, found: %d", id, offset));
152            cqTailer.moveToIndex(offset);
153        } else {
154            log.debug(String.format("toLastCommitted: %s, not found, move toStart", id));
155            cqTailer.toStart();
156        }
157    }
158
159    @Override
160    public void seek(LogOffset offset) {
161        if (!this.partition.equals(offset.partition())) {
162            throw new IllegalStateException(
163                    "Cannot seek, tailer " + this + " has no assignment for partition: " + offset);
164        }
165        log.debug("Seek to " + offset + " from tailer: " + this);
166        cqTailer.moveToIndex(offset.offset());
167    }
168
169    @Override
170    public void reset() {
171        reset(new LogPartition(id.name, id.partition));
172    }
173
174    @Override
175    public void reset(LogPartition partition) {
176        if (!this.partition.equals(partition)) {
177            throw new IllegalArgumentException("Cannot reset this partition: " + partition + " from " + id);
178        }
179        log.info("Reset offset for partition: " + partition + " from tailer: " + this);
180        cqTailer.toStart();
181        commit(partition);
182    }
183
184    @Override
185    public Collection<LogPartition> assignments() {
186        return Collections.singletonList(new LogPartition(id.name, id.partition));
187    }
188
189    @Override
190    public String group() {
191        return id.group;
192    }
193
194    @Override
195    public void close() {
196        if (!closed) {
197            log.debug("Closing: " + toString());
198            offsetTracker.close();
199            unregisterTailer();
200            closed = true;
201        }
202    }
203
204    @Override
205    public boolean closed() {
206        return closed;
207    }
208
209    @Override
210    public String toString() {
211        return "ChronicleLogTailer{" + "basePath='" + basePath + '\'' + ", id=" + id + ", closed=" + closed + '}';
212    }
213
214}