001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.MSG_KEY;
023
024import java.io.Externalizable;
025import java.time.Duration;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import java.util.Objects;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.lib.stream.codec.Codec;
038import org.nuxeo.lib.stream.log.LogOffset;
039import org.nuxeo.lib.stream.log.LogPartition;
040import org.nuxeo.lib.stream.log.LogRecord;
041import org.nuxeo.lib.stream.log.LogTailer;
042import org.nuxeo.lib.stream.log.Name;
043import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
044import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
045
046import net.openhft.chronicle.queue.ExcerptTailer;
047import net.openhft.chronicle.queue.TailerState;
048
049/**
050 * @since 9.3
051 */
052public class ChronicleLogTailer<M extends Externalizable> implements LogTailer<M> {
053    protected static final long POLL_INTERVAL_MS = 100L;
054
055    // keep track of all tailers on the same namespace index even from different log
056    protected static final Set<LogPartitionGroup> tailersId = Collections.newSetFromMap(new ConcurrentHashMap<>());
057
058    private static final Log log = LogFactory.getLog(ChronicleLogTailer.class);
059
060    protected final String basePath;
061
062    protected final ExcerptTailer cqTailer;
063
064    protected ChronicleLogOffsetTracker offsetTracker;
065
066    protected final ChronicleRetentionDuration retention;
067
068    protected final LogPartitionGroup id;
069
070    protected final LogPartition partition;
071
072    protected final Codec<M> codec;
073
074    protected volatile boolean closed = false;
075
076    protected boolean initialized;
077
078    public ChronicleLogTailer(Codec<M> codec, String basePath, ExcerptTailer cqTailer, LogPartition partition,
079                              Name group, ChronicleRetentionDuration retention) {
080        Objects.requireNonNull(group);
081        this.codec = codec;
082        this.basePath = basePath;
083        this.cqTailer = cqTailer;
084        this.partition = partition;
085        this.retention = retention;
086        this.id = new LogPartitionGroup(group, partition.name(), partition.partition());
087        registerTailer();
088    }
089
090    protected ChronicleLogOffsetTracker getOffsetTracker() {
091        if (offsetTracker == null) {
092            offsetTracker = new ChronicleLogOffsetTracker(basePath, partition.partition(), group(), retention);
093        }
094        return offsetTracker;
095    }
096
097    protected void registerTailer() {
098        if (!tailersId.add(id)) {
099            throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id);
100        }
101    }
102
103    protected void checkInitialized() {
104        if (initialized) {
105            return;
106        }
107        toLastCommitted();
108    }
109
110    protected void unregisterTailer() {
111        tailersId.remove(id);
112    }
113
114    @Override
115    public LogRecord<M> read(Duration timeout) throws InterruptedException {
116        LogRecord<M> ret = read();
117        if (ret != null) {
118            return ret;
119        }
120        long timeoutMs = timeout.toMillis();
121        long deadline = System.currentTimeMillis() + timeoutMs;
122        long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
123        while (ret == null && System.currentTimeMillis() < deadline) {
124            Thread.sleep(delay);
125            ret = read();
126        }
127        return ret;
128    }
129
130    @SuppressWarnings("unchecked")
131    protected LogRecord<M> read() {
132        if (closed) {
133            throw new IllegalStateException("The tailer has been closed.");
134        }
135        checkInitialized();
136        List<M> value = new ArrayList<>(1);
137        AtomicLong offset = new AtomicLong();
138        if (NO_CODEC.equals(codec)) {
139            // default format to keep backward compatibility
140            try {
141                if (!cqTailer.readDocument(w -> {
142                    offset.set(cqTailer.index());
143                    value.add((M) w.read(MSG_KEY).object());
144                })) {
145                    return null;
146                }
147            } catch (ClassCastException e) {
148                throw new IllegalArgumentException(e);
149            }
150        } else {
151            if (!cqTailer.readDocument(w -> {
152                offset.set(cqTailer.index());
153                value.add(codec.decode(w.read().bytes()));
154            })) {
155                return null;
156            }
157        }
158        return new LogRecord<>(value.get(0), new LogOffsetImpl(partition, offset.get()));
159    }
160
161    @Override
162    public LogOffset commit(LogPartition partition) {
163        checkInitialized();
164        if (!this.partition.equals(partition)) {
165            throw new IllegalArgumentException("Cannot commit this partition: " + partition + " from " + id);
166        }
167        long offset = cqTailer.index();
168        getOffsetTracker().commit(offset);
169        if (log.isTraceEnabled()) {
170            log.trace(String.format("Commit %s:+%d", id, offset));
171        }
172        return new LogOffsetImpl(partition, offset);
173    }
174
175    @Override
176    public void commit() {
177        commit(partition);
178    }
179
180    @Override
181    public void toEnd() {
182        log.debug(String.format("toEnd: %s", id));
183        cqTailer.toEnd();
184        initialized = true;
185    }
186
187    @Override
188    public void toStart() {
189        log.debug(String.format("toStart: %s", id));
190        cqTailer.toStart();
191        if (!cqTailer.state().equals(TailerState.FOUND_CYCLE)) {
192            log.info("Unable to move to start because the tailer is not initialized, " + this);
193        }
194        initialized = true;
195    }
196
197    @Override
198    public void toLastCommitted() {
199        log.debug(String.format("toLastCommitted: %s", id));
200        long offset = getOffsetTracker().getLastCommittedOffset();
201        if (offset > 0) {
202            log.debug(String.format("toLastCommitted: %s, found: %d", id, offset));
203            if (!cqTailer.moveToIndex(offset) && cqTailer.index() != offset) {
204                // sometime moveToIndex returns false but offset is moved
205                toStart();
206                long startOffset = cqTailer.index();
207                if (offset < startOffset) {
208                    log.error("The last committed offset: " + offset + " for tailer: " + this
209                            + " points to a record that has been deleted by the retention policy."
210                            + " Records have been lost, continuing from the beginning of the partition offset: "
211                            + startOffset);
212                } else {
213                    // probably a corrupted Log where some cq4 files are missing
214                    throw new IllegalStateException(
215                            "Unable to move to the last committed offset: " + offset + " for tailer: " + this);
216                }
217            }
218        } else {
219            log.debug(String.format("toLastCommitted: %s, not found, move toStart", id));
220            cqTailer.toStart();
221        }
222        initialized = true;
223    }
224
225    @Override
226    public void seek(LogOffset offset) {
227        if (!this.partition.equals(offset.partition())) {
228            throw new IllegalStateException(
229                    "Cannot seek, tailer " + this + " has no assignment for partition: " + offset);
230        }
231        log.debug("Seek to " + offset + " from tailer: " + this);
232        if (!cqTailer.moveToIndex(offset.offset()) && cqTailer.index() != offset.offset()) {
233            throw new IllegalStateException("Unable to seek to offset, " + this + " offset: " + offset);
234        }
235        initialized = true;
236    }
237
238    @Override
239    public void reset() {
240        reset(new LogPartition(id.name, id.partition));
241    }
242
243    @Override
244    public void reset(LogPartition partition) {
245        if (!this.partition.equals(partition)) {
246            throw new IllegalArgumentException("Cannot reset this partition: " + partition + " from " + id);
247        }
248        log.info("Reset offset for partition: " + partition + " from tailer: " + this);
249        cqTailer.toStart();
250        initialized = true;
251        commit(partition);
252    }
253
254    @Override
255    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
256        throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp");
257    }
258
259    @Override
260    public Collection<LogPartition> assignments() {
261        return Collections.singletonList(new LogPartition(id.name, id.partition));
262    }
263
264    @Override
265    public Name group() {
266        return id.group;
267    }
268
269    @Override
270    public void close() {
271        if (!closed) {
272            log.debug("Closing: " + toString());
273            if (offsetTracker != null) {
274                offsetTracker.close();
275                offsetTracker = null;
276            }
277            unregisterTailer();
278            closed = true;
279            initialized = false;
280        }
281    }
282
283    @Override
284    public boolean closed() {
285        return closed;
286    }
287
288    @Override
289    public Codec<M> getCodec() {
290        return codec;
291    }
292
293    @Override
294    public String toString() {
295        return "ChronicleLogTailer{" + "basePath='" + basePath + '\'' + ", id=" + id + ", closed=" + closed + ", codec="
296                + codec + '}';
297    }
298
299}