001package org.nuxeo.ecm.platform.importer.mqueues.mqueues;/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019
020import net.openhft.chronicle.queue.ExcerptTailer;
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
024
025import java.time.Duration;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031
032/**
033 * @since 9.1
034 */
035public class CQTailer<M extends Message> implements MQueues.Tailer<M> {
036    private static final Log log = LogFactory.getLog(CQTailer.class);
037    private static final long POLL_INTERVAL_MS = 100L;
038    public static final String DEFAULT_OFFSET_NAMESPACE = "default";
039
040    private final String basePath;
041    private final ExcerptTailer tailer;
042    private final String nameSpace;
043    private final int queueIndex;
044    private final CQOffsetTracker offsetTracker;
045    private boolean closed = false;
046
047    // keep track of all tailers on the same namespace index even from different mq
048    private static final Set<String> indexNamespace = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
049
050    public CQTailer(String basePath, ExcerptTailer tailer, int queue) {
051        this(basePath, tailer, queue, null);
052    }
053
054    public CQTailer(String basePath, ExcerptTailer tailer, int queue, String nameSpace) {
055        this.basePath = basePath;
056        this.tailer = tailer;
057        this.queueIndex = queue;
058        if (nameSpace == null) {
059            this.nameSpace = DEFAULT_OFFSET_NAMESPACE;
060        } else {
061            this.nameSpace = nameSpace;
062        }
063        registerTailer();
064        this.offsetTracker = new CQOffsetTracker(basePath, queue, this.nameSpace);
065        toLastCommitted();
066    }
067
068    private void registerTailer() {
069        String key = getTailerKey();
070        if (!indexNamespace.add(key)) {
071            throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + key);
072        }
073    }
074
075    private void unregisterTailer() {
076        String key = getTailerKey();
077        indexNamespace.remove(key);
078    }
079
080    private String getTailerKey() {
081        return basePath + " " + queueIndex + " " + nameSpace;
082    }
083
084    @Override
085    public M read(Duration timeout) throws InterruptedException {
086        M ret = read();
087        if (ret != null) {
088            return ret;
089        }
090        final long timeoutMs = timeout.toMillis();
091        final long deadline = System.currentTimeMillis() + timeoutMs;
092        final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
093        while (ret == null && System.currentTimeMillis() < deadline) {
094            Thread.sleep(delay);
095            ret = read();
096        }
097        return ret;
098    }
099
100    @SuppressWarnings("unchecked")
101    private M read() {
102        if (closed) {
103            throw new IllegalStateException("The tailer has been closed.");
104        }
105        final List<M> ret = new ArrayList<>(1);
106        if (tailer.readDocument(w -> ret.add((M) w.read("msg").object()))) {
107            return ret.get(0);
108        }
109        return null;
110    }
111
112    @Override
113    public Offset commit() {
114        // we write raw: queue, offset, timestamp
115        long offset = tailer.index();
116        offsetTracker.commit(offset);
117        if (log.isTraceEnabled()) {
118            log.trace(String.format("queue-%02d commit offset: %d", queueIndex, offset));
119        }
120        return new CQOffset(queueIndex, offset);
121    }
122
123    @Override
124    public void toEnd() {
125        log.debug(String.format("queue-%02d toEnd", queueIndex));
126        tailer.toEnd();
127    }
128
129    @Override
130    public void toStart() {
131        log.debug(String.format("queue-%02d toStart", queueIndex));
132        tailer.toStart();
133    }
134
135    @Override
136    public void toLastCommitted() {
137        long offset = offsetTracker.getLastCommittedOffset();
138        if (offset > 0) {
139            log.debug(String.format("queue-%02d toLastCommitted found: %d", queueIndex, offset));
140            tailer.moveToIndex(offset);
141        } else {
142            log.debug(String.format("queue-%02d toLastCommitted not found, run from beginning", queueIndex));
143            tailer.toStart();
144        }
145    }
146
147    @Override
148    public int getQueue() {
149        return queueIndex;
150    }
151
152    @Override
153    public void close() throws Exception {
154        offsetTracker.close();
155        unregisterTailer();
156        closed = true;
157    }
158}