001package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle;/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019 020import net.openhft.chronicle.queue.ExcerptTailer; 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset; 024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord; 026import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer; 027import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQOffsetImpl; 028import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQPartitionGroup; 029 030import java.io.Externalizable; 031import java.time.Duration; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.List; 036import java.util.Objects; 037import java.util.Set; 038import java.util.concurrent.ConcurrentHashMap; 039 040/** 041 * @since 9.1 042 */ 043public class ChronicleMQTailer<M extends Externalizable> implements MQTailer<M> { 044 private static final Log log = LogFactory.getLog(ChronicleMQTailer.class); 045 protected static final long POLL_INTERVAL_MS = 100L; 046 047 private final String basePath; 048 private final ExcerptTailer cqTailer; 049 private final ChronicleMQOffsetTracker offsetTracker; 050 private final MQPartitionGroup id; 051 private final MQPartition partition; 052 private boolean closed = false; 053 054 // keep track of all tailers on the same namespace index even from different mq 055 private static final Set<MQPartitionGroup> tailersId = Collections.newSetFromMap(new ConcurrentHashMap<MQPartitionGroup, Boolean>()); 056 057 public ChronicleMQTailer(String basePath, ExcerptTailer cqTailer, MQPartition partition, String group) { 058 Objects.requireNonNull(group); 059 this.basePath = basePath; 060 this.cqTailer = cqTailer; 061 this.partition = partition; 062 this.id = new MQPartitionGroup(group, partition.name(), partition.partition()); 063 registerTailer(); 064 this.offsetTracker = new ChronicleMQOffsetTracker(basePath, partition.partition(), group); 065 toLastCommitted(); 066 } 067 068 private void registerTailer() { 069 if (!tailersId.add(id)) { 070 throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + id); 071 } 072 } 073 074 private void unregisterTailer() { 075 tailersId.remove(id); 076 } 077 078 @Override 079 public MQRecord<M> read(Duration timeout) throws InterruptedException { 080 MQRecord<M> ret = read(); 081 if (ret != null) { 082 return ret; 083 } 084 final long timeoutMs = timeout.toMillis(); 085 final long deadline = System.currentTimeMillis() + timeoutMs; 086 final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 087 while (ret == null && System.currentTimeMillis() < deadline) { 088 Thread.sleep(delay); 089 ret = read(); 090 } 091 return ret; 092 } 093 094 095 @SuppressWarnings("unchecked") 096 protected MQRecord<M> read() { 097 if (closed) { 098 throw new IllegalStateException("The tailer has been closed."); 099 } 100 final List<M> value = new ArrayList<>(1); 101 if (!cqTailer.readDocument(w -> value.add((M) w.read("msg").object()))) { 102 return null; 103 104 } 105 return new MQRecord<>(partition, value.get(0), new MQOffsetImpl(partition, cqTailer.index())); 106 } 107 108 @Override 109 public MQOffset commit(MQPartition partition) { 110 // we write raw: queue, offset, timestamp 111 if (!this.partition.equals(partition)) { 112 throw new IllegalArgumentException("Can not commit this partition: " + partition + " from " + id); 113 } 114 long offset = cqTailer.index(); 115 offsetTracker.commit(offset); 116 if (log.isTraceEnabled()) { 117 log.trace(String.format("Commit %s:+%d", id, offset)); 118 } 119 return new MQOffsetImpl(partition, offset); 120 } 121 122 @Override 123 public void commit() { 124 commit(partition); 125 } 126 127 @Override 128 public void toEnd() { 129 log.debug(String.format("toEnd: %s", id)); 130 cqTailer.toEnd(); 131 } 132 133 @Override 134 public void toStart() { 135 log.debug(String.format("toStart: %s", id)); 136 cqTailer.toStart(); 137 } 138 139 @Override 140 public void toLastCommitted() { 141 long offset = offsetTracker.getLastCommittedOffset(); 142 if (offset > 0) { 143 log.debug(String.format("toLastCommitted: %s, found: %d", id, offset)); 144 cqTailer.moveToIndex(offset); 145 } else { 146 log.debug(String.format("toLastCommitted: %s not found, run from beginning", id)); 147 cqTailer.toStart(); 148 } 149 } 150 151 public void seek(MQPartition partition, MQOffset offset) { 152 cqTailer.moveToIndex(offset.offset()); 153 } 154 155 @Override 156 public Collection<MQPartition> assignments() { 157 return Collections.singletonList(new MQPartition(id.name, id.partition)); 158 } 159 160 @Override 161 public String group() { 162 return id.group; 163 } 164 165 @Override 166 public void close() throws Exception { 167 offsetTracker.close(); 168 unregisterTailer(); 169 closed = true; 170 } 171 172 @Override 173 public boolean closed() { 174 return closed; 175 } 176 177 @Override 178 public String toString() { 179 return "ChronicleMQTailer{" + 180 "basePath='" + basePath + '\'' + 181 ", id=" + id + 182 ", closed=" + closed + 183 '}'; 184 } 185 186}