001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.MSG_KEY;
023
024import java.io.Externalizable;
025import java.time.Duration;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import java.util.Objects;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.lib.stream.codec.Codec;
037import org.nuxeo.lib.stream.log.LogOffset;
038import org.nuxeo.lib.stream.log.LogPartition;
039import org.nuxeo.lib.stream.log.LogRecord;
040import org.nuxeo.lib.stream.log.LogTailer;
041import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
042import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
043
044import net.openhft.chronicle.queue.ExcerptTailer;
045import net.openhft.chronicle.queue.TailerState;
046
047/**
048 * @since 9.3
049 */
050public class ChronicleLogTailer<M extends Externalizable> implements LogTailer<M> {
051    protected static final long POLL_INTERVAL_MS = 100L;
052
053    // keep track of all tailers on the same namespace index even from different log
054    protected static final Set<LogPartitionGroup> tailersId = Collections.newSetFromMap(
055            new ConcurrentHashMap<LogPartitionGroup, Boolean>());
056
057    private static final Log log = LogFactory.getLog(ChronicleLogTailer.class);
058
059    protected final String basePath;
060
061    protected final ExcerptTailer cqTailer;
062
063    protected final ChronicleLogOffsetTracker offsetTracker;
064
065    protected final LogPartitionGroup id;
066
067    protected final LogPartition partition;
068
069    protected final Codec<M> codec;
070
071    protected volatile boolean closed = false;
072
073    public ChronicleLogTailer(Codec<M> codec, String basePath, ExcerptTailer cqTailer, LogPartition partition,
074            String group, ChronicleRetentionDuration retention) {
075        Objects.requireNonNull(group);
076        this.codec = codec;
077        this.basePath = basePath;
078        this.cqTailer = cqTailer;
079        this.partition = partition;
080        this.id = new LogPartitionGroup(group, partition.name(), partition.partition());
081        registerTailer();
082        this.offsetTracker = new ChronicleLogOffsetTracker(basePath, partition.partition(), group, retention);
083        toLastCommitted();
084    }
085
086    protected void registerTailer() {
087        if (!tailersId.add(id)) {
088            throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id);
089        }
090    }
091
092    protected void unregisterTailer() {
093        tailersId.remove(id);
094    }
095
096    @Override
097    public LogRecord<M> read(Duration timeout) throws InterruptedException {
098        LogRecord<M> ret = read();
099        if (ret != null) {
100            return ret;
101        }
102        long timeoutMs = timeout.toMillis();
103        long deadline = System.currentTimeMillis() + timeoutMs;
104        long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
105        while (ret == null && System.currentTimeMillis() < deadline) {
106            Thread.sleep(delay);
107            ret = read();
108        }
109        return ret;
110    }
111
112    @SuppressWarnings("unchecked")
113    protected LogRecord<M> read() {
114        if (closed) {
115            throw new IllegalStateException("The tailer has been closed.");
116        }
117        List<M> value = new ArrayList<>(1);
118        long offset = cqTailer.index();
119        if (NO_CODEC.equals(codec)) {
120            // default format to keep backward compatibility
121            try {
122                if (!cqTailer.readDocument(w -> value.add((M) w.read(MSG_KEY).object()))) {
123                    return null;
124                }
125            } catch (ClassCastException e) {
126                throw new IllegalArgumentException(e);
127            }
128        } else {
129            if (!cqTailer.readDocument(w -> value.add(codec.decode(w.read().bytes())))) {
130                return null;
131            }
132        }
133        return new LogRecord<>(value.get(0), new LogOffsetImpl(partition, offset));
134    }
135
136    @Override
137    public LogOffset commit(LogPartition partition) {
138        if (!this.partition.equals(partition)) {
139            throw new IllegalArgumentException("Cannot commit this partition: " + partition + " from " + id);
140        }
141        long offset = cqTailer.index();
142        offsetTracker.commit(offset);
143        if (log.isTraceEnabled()) {
144            log.trace(String.format("Commit %s:+%d", id, offset));
145        }
146        return new LogOffsetImpl(partition, offset);
147    }
148
149    @Override
150    public void commit() {
151        commit(partition);
152    }
153
154    @Override
155    public void toEnd() {
156        log.debug(String.format("toEnd: %s", id));
157        cqTailer.toEnd();
158    }
159
160    @Override
161    public void toStart() {
162        log.debug(String.format("toStart: %s", id));
163        cqTailer.toStart();
164        if (!cqTailer.state().equals(TailerState.FOUND_CYCLE)) {
165            log.info("Unable to move to start because the tailer is not initialized, " + this);
166        }
167    }
168
169    @Override
170    public void toLastCommitted() {
171        long offset = offsetTracker.getLastCommittedOffset();
172        if (offset > 0) {
173            log.debug(String.format("toLastCommitted: %s, found: %d", id, offset));
174            if (!cqTailer.moveToIndex(offset) && cqTailer.index() != offset) {
175                // sometime moveToIndex returns false but offset is moved
176                throw new IllegalStateException(
177                        "Unable to move to the last committed offset, " + this + " offset: " + offset);
178            }
179        } else {
180            log.debug(String.format("toLastCommitted: %s, not found, move toStart", id));
181            cqTailer.toStart();
182        }
183    }
184
185    @Override
186    public void seek(LogOffset offset) {
187        if (!this.partition.equals(offset.partition())) {
188            throw new IllegalStateException(
189                    "Cannot seek, tailer " + this + " has no assignment for partition: " + offset);
190        }
191        log.debug("Seek to " + offset + " from tailer: " + this);
192        if (!cqTailer.moveToIndex(offset.offset()) && cqTailer.index() != offset.offset()) {
193            throw new IllegalStateException("Unable to seek to offset, " + this + " offset: " + offset);
194        }
195    }
196
197    @Override
198    public void reset() {
199        reset(new LogPartition(id.name, id.partition));
200    }
201
202    @Override
203    public void reset(LogPartition partition) {
204        if (!this.partition.equals(partition)) {
205            throw new IllegalArgumentException("Cannot reset this partition: " + partition + " from " + id);
206        }
207        log.info("Reset offset for partition: " + partition + " from tailer: " + this);
208        cqTailer.toStart();
209        commit(partition);
210    }
211
212    @Override
213    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
214        throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp");
215    }
216
217    @Override
218    public Collection<LogPartition> assignments() {
219        return Collections.singletonList(new LogPartition(id.name, id.partition));
220    }
221
222    @Override
223    public String group() {
224        return id.group;
225    }
226
227    @Override
228    public void close() {
229        if (!closed) {
230            log.debug("Closing: " + toString());
231            offsetTracker.close();
232            unregisterTailer();
233            closed = true;
234        }
235    }
236
237    @Override
238    public boolean closed() {
239        return closed;
240    }
241
242    @Override
243    public Codec<M> getCodec() {
244        return codec;
245    }
246
247    @Override
248    public String toString() {
249        return "ChronicleLogTailer{" + "basePath='" + basePath + '\'' + ", id=" + id + ", closed=" + closed + ", codec="
250                + codec + '}';
251    }
252
253}