001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.MSG_KEY;
023
024import java.io.Externalizable;
025import java.time.Duration;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import java.util.Objects;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.lib.stream.codec.Codec;
037import org.nuxeo.lib.stream.log.LogOffset;
038import org.nuxeo.lib.stream.log.LogPartition;
039import org.nuxeo.lib.stream.log.LogRecord;
040import org.nuxeo.lib.stream.log.LogTailer;
041import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
042import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
043
044import net.openhft.chronicle.queue.ExcerptTailer;
045import net.openhft.chronicle.queue.TailerState;
046
047/**
048 * @since 9.3
049 */
050public class ChronicleLogTailer<M extends Externalizable> implements LogTailer<M> {
051    protected static final long POLL_INTERVAL_MS = 100L;
052
053    // keep track of all tailers on the same namespace index even from different log
054    protected static final Set<LogPartitionGroup> tailersId = Collections.newSetFromMap(new ConcurrentHashMap<>());
055
056    private static final Log log = LogFactory.getLog(ChronicleLogTailer.class);
057
058    protected final String basePath;
059
060    protected final ExcerptTailer cqTailer;
061
062    protected final ChronicleLogOffsetTracker offsetTracker;
063
064    protected final LogPartitionGroup id;
065
066    protected final LogPartition partition;
067
068    protected final Codec<M> codec;
069
070    protected volatile boolean closed = false;
071
072    public ChronicleLogTailer(Codec<M> codec, String basePath, ExcerptTailer cqTailer, LogPartition partition,
073            String group, ChronicleRetentionDuration retention) {
074        Objects.requireNonNull(group);
075        this.codec = codec;
076        this.basePath = basePath;
077        this.cqTailer = cqTailer;
078        this.partition = partition;
079        this.id = new LogPartitionGroup(group, partition.name(), partition.partition());
080        registerTailer();
081        this.offsetTracker = new ChronicleLogOffsetTracker(basePath, partition.partition(), group, retention);
082        toLastCommitted();
083    }
084
085    protected void registerTailer() {
086        if (!tailersId.add(id)) {
087            throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id);
088        }
089    }
090
091    protected void unregisterTailer() {
092        tailersId.remove(id);
093    }
094
095    @Override
096    public LogRecord<M> read(Duration timeout) throws InterruptedException {
097        LogRecord<M> ret = read();
098        if (ret != null) {
099            return ret;
100        }
101        long timeoutMs = timeout.toMillis();
102        long deadline = System.currentTimeMillis() + timeoutMs;
103        long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
104        while (ret == null && System.currentTimeMillis() < deadline) {
105            Thread.sleep(delay);
106            ret = read();
107        }
108        return ret;
109    }
110
111    @SuppressWarnings("unchecked")
112    protected LogRecord<M> read() {
113        if (closed) {
114            throw new IllegalStateException("The tailer has been closed.");
115        }
116        List<M> value = new ArrayList<>(1);
117        long offset = cqTailer.index();
118        if (NO_CODEC.equals(codec)) {
119            // default format to keep backward compatibility
120            try {
121                if (!cqTailer.readDocument(w -> value.add((M) w.read(MSG_KEY).object()))) {
122                    return null;
123                }
124            } catch (ClassCastException e) {
125                throw new IllegalArgumentException(e);
126            }
127        } else {
128            if (!cqTailer.readDocument(w -> value.add(codec.decode(w.read().bytes())))) {
129                return null;
130            }
131        }
132        return new LogRecord<>(value.get(0), new LogOffsetImpl(partition, offset));
133    }
134
135    @Override
136    public LogOffset commit(LogPartition partition) {
137        if (!this.partition.equals(partition)) {
138            throw new IllegalArgumentException("Cannot commit this partition: " + partition + " from " + id);
139        }
140        long offset = cqTailer.index();
141        offsetTracker.commit(offset);
142        if (log.isTraceEnabled()) {
143            log.trace(String.format("Commit %s:+%d", id, offset));
144        }
145        return new LogOffsetImpl(partition, offset);
146    }
147
148    @Override
149    public void commit() {
150        commit(partition);
151    }
152
153    @Override
154    public void toEnd() {
155        log.debug(String.format("toEnd: %s", id));
156        cqTailer.toEnd();
157    }
158
159    @Override
160    public void toStart() {
161        log.debug(String.format("toStart: %s", id));
162        cqTailer.toStart();
163        if (!cqTailer.state().equals(TailerState.FOUND_CYCLE)) {
164            log.info("Unable to move to start because the tailer is not initialized, " + this);
165        }
166    }
167
168    @Override
169    public void toLastCommitted() {
170        long offset = offsetTracker.getLastCommittedOffset();
171        if (offset > 0) {
172            log.debug(String.format("toLastCommitted: %s, found: %d", id, offset));
173            if (!cqTailer.moveToIndex(offset) && cqTailer.index() != offset) {
174                // sometime moveToIndex returns false but offset is moved
175                throw new IllegalStateException(
176                        "Unable to move to the last committed offset, " + this + " offset: " + offset);
177            }
178        } else {
179            log.debug(String.format("toLastCommitted: %s, not found, move toStart", id));
180            cqTailer.toStart();
181        }
182    }
183
184    @Override
185    public void seek(LogOffset offset) {
186        if (!this.partition.equals(offset.partition())) {
187            throw new IllegalStateException(
188                    "Cannot seek, tailer " + this + " has no assignment for partition: " + offset);
189        }
190        log.debug("Seek to " + offset + " from tailer: " + this);
191        if (!cqTailer.moveToIndex(offset.offset()) && cqTailer.index() != offset.offset()) {
192            throw new IllegalStateException("Unable to seek to offset, " + this + " offset: " + offset);
193        }
194    }
195
196    @Override
197    public void reset() {
198        reset(new LogPartition(id.name, id.partition));
199    }
200
201    @Override
202    public void reset(LogPartition partition) {
203        if (!this.partition.equals(partition)) {
204            throw new IllegalArgumentException("Cannot reset this partition: " + partition + " from " + id);
205        }
206        log.info("Reset offset for partition: " + partition + " from tailer: " + this);
207        cqTailer.toStart();
208        commit(partition);
209    }
210
211    @Override
212    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
213        throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp");
214    }
215
216    @Override
217    public Collection<LogPartition> assignments() {
218        return Collections.singletonList(new LogPartition(id.name, id.partition));
219    }
220
221    @Override
222    public String group() {
223        return id.group;
224    }
225
226    @Override
227    public void close() {
228        if (!closed) {
229            log.debug("Closing: " + toString());
230            offsetTracker.close();
231            unregisterTailer();
232            closed = true;
233        }
234    }
235
236    @Override
237    public boolean closed() {
238        return closed;
239    }
240
241    @Override
242    public Codec<M> getCodec() {
243        return codec;
244    }
245
246    @Override
247    public String toString() {
248        return "ChronicleLogTailer{" + "basePath='" + basePath + '\'' + ", id=" + id + ", closed=" + closed + ", codec="
249                + codec + '}';
250    }
251
252}