001package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle;/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019
020import net.openhft.chronicle.queue.ExcerptTailer;
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset;
024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord;
026import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
027import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQOffsetImpl;
028import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQPartitionGroup;
029
030import java.io.Externalizable;
031import java.time.Duration;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.List;
036import java.util.Objects;
037import java.util.Set;
038import java.util.concurrent.ConcurrentHashMap;
039
040/**
041 * @since 9.1
042 */
043public class ChronicleMQTailer<M extends Externalizable> implements MQTailer<M> {
044    private static final Log log = LogFactory.getLog(ChronicleMQTailer.class);
045    protected static final long POLL_INTERVAL_MS = 100L;
046
047    private final String basePath;
048    private final ExcerptTailer cqTailer;
049    private final ChronicleMQOffsetTracker offsetTracker;
050    private final MQPartitionGroup id;
051    private final MQPartition partition;
052    private boolean closed = false;
053
054    // keep track of all tailers on the same namespace index even from different mq
055    private static final Set<MQPartitionGroup> tailersId = Collections.newSetFromMap(new ConcurrentHashMap<MQPartitionGroup, Boolean>());
056
057    public ChronicleMQTailer(String basePath, ExcerptTailer cqTailer, MQPartition partition, String group) {
058        Objects.requireNonNull(group);
059        this.basePath = basePath;
060        this.cqTailer = cqTailer;
061        this.partition = partition;
062        this.id = new MQPartitionGroup(group, partition.name(), partition.partition());
063        registerTailer();
064        this.offsetTracker = new ChronicleMQOffsetTracker(basePath, partition.partition(), group);
065        toLastCommitted();
066    }
067
068    private void registerTailer() {
069        if (!tailersId.add(id)) {
070            throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id);
071        }
072    }
073
074    private void unregisterTailer() {
075        tailersId.remove(id);
076    }
077
078    @Override
079    public MQRecord<M> read(Duration timeout) throws InterruptedException {
080        MQRecord<M> ret = read();
081        if (ret != null) {
082            return ret;
083        }
084        final long timeoutMs = timeout.toMillis();
085        final long deadline = System.currentTimeMillis() + timeoutMs;
086        final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
087        while (ret == null && System.currentTimeMillis() < deadline) {
088            Thread.sleep(delay);
089            ret = read();
090        }
091        return ret;
092    }
093
094
095    @SuppressWarnings("unchecked")
096    protected MQRecord<M> read() {
097        if (closed) {
098            throw new IllegalStateException("The tailer has been closed.");
099        }
100        final List<M> value = new ArrayList<>(1);
101        if (!cqTailer.readDocument(w -> value.add((M) w.read("msg").object()))) {
102            return null;
103
104        }
105        return new MQRecord<>(partition, value.get(0), new MQOffsetImpl(partition, cqTailer.index()));
106    }
107
108    @Override
109    public MQOffset commit(MQPartition partition) {
110        // we write raw: queue, offset, timestamp
111        if (!this.partition.equals(partition)) {
112            throw new IllegalArgumentException("Can not commit this partition: " + partition + " from " + id);
113        }
114        long offset = cqTailer.index();
115        offsetTracker.commit(offset);
116        if (log.isTraceEnabled()) {
117            log.trace(String.format("Commit %s:+%d", id, offset));
118        }
119        return new MQOffsetImpl(partition, offset);
120    }
121
122    @Override
123    public void commit() {
124        commit(partition);
125    }
126
127    @Override
128    public void toEnd() {
129        log.debug(String.format("toEnd: %s", id));
130        cqTailer.toEnd();
131    }
132
133    @Override
134    public void toStart() {
135        log.debug(String.format("toStart: %s", id));
136        cqTailer.toStart();
137    }
138
139    @Override
140    public void toLastCommitted() {
141        long offset = offsetTracker.getLastCommittedOffset();
142        if (offset > 0) {
143            log.debug(String.format("toLastCommitted: %s, found: %d", id, offset));
144            cqTailer.moveToIndex(offset);
145        } else {
146            log.debug(String.format("toLastCommitted: %s not found, run from beginning", id));
147            cqTailer.toStart();
148        }
149    }
150
151    public void seek(MQPartition partition, MQOffset offset) {
152        cqTailer.moveToIndex(offset.offset());
153    }
154
155    @Override
156    public Collection<MQPartition> assignments() {
157        return Collections.singletonList(new MQPartition(id.name, id.partition));
158    }
159
160    @Override
161    public String group() {
162        return id.group;
163    }
164
165    @Override
166    public void close() throws Exception {
167        offsetTracker.close();
168        unregisterTailer();
169        closed = true;
170    }
171
172    @Override
173    public boolean closed() {
174        return closed;
175    }
176
177    @Override
178    public String toString() {
179        return "ChronicleMQTailer{" +
180                "basePath='" + basePath + '\'' +
181                ", id=" + id +
182                ", closed=" + closed +
183                '}';
184    }
185
186}