001package org.nuxeo.ecm.platform.importer.mqueues.mqueues;/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019 020import net.openhft.chronicle.queue.ExcerptTailer; 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.platform.importer.mqueues.message.Message; 024 025import java.time.Duration; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031 032/** 033 * @since 9.1 034 */ 035public class CQTailer<M extends Message> implements MQueues.Tailer<M> { 036 private static final Log log = LogFactory.getLog(CQTailer.class); 037 private static final long POLL_INTERVAL_MS = 100L; 038 public static final String DEFAULT_OFFSET_NAMESPACE = "default"; 039 040 private final String basePath; 041 private final ExcerptTailer tailer; 042 private final String nameSpace; 043 private final int queueIndex; 044 private final CQOffsetTracker offsetTracker; 045 private boolean closed = false; 046 047 // keep track of all tailers on the same namespace index even from different mq 048 private static final Set<String> indexNamespace = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); 049 050 public CQTailer(String basePath, ExcerptTailer tailer, int queue) { 051 this(basePath, tailer, queue, null); 052 } 053 054 public CQTailer(String basePath, ExcerptTailer tailer, int queue, String nameSpace) { 055 this.basePath = basePath; 056 this.tailer = tailer; 057 this.queueIndex = queue; 058 if (nameSpace == null) { 059 this.nameSpace = DEFAULT_OFFSET_NAMESPACE; 060 } else { 061 this.nameSpace = nameSpace; 062 } 063 registerTailer(); 064 this.offsetTracker = new CQOffsetTracker(basePath, queue, this.nameSpace); 065 toLastCommitted(); 066 } 067 068 private void registerTailer() { 069 String key = getTailerKey(); 070 if (!indexNamespace.add(key)) { 071 throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + key); 072 } 073 } 074 075 private void unregisterTailer() { 076 String key = getTailerKey(); 077 indexNamespace.remove(key); 078 } 079 080 private String getTailerKey() { 081 return basePath + " " + queueIndex + " " + nameSpace; 082 } 083 084 @Override 085 public M read(Duration timeout) throws InterruptedException { 086 M ret = read(); 087 if (ret != null) { 088 return ret; 089 } 090 final long timeoutMs = timeout.toMillis(); 091 final long deadline = System.currentTimeMillis() + timeoutMs; 092 final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 093 while (ret == null && System.currentTimeMillis() < deadline) { 094 Thread.sleep(delay); 095 ret = read(); 096 } 097 return ret; 098 } 099 100 @SuppressWarnings("unchecked") 101 private M read() { 102 if (closed) { 103 throw new IllegalStateException("The tailer has been closed."); 104 } 105 final List<M> ret = new ArrayList<>(1); 106 if (tailer.readDocument(w -> ret.add((M) w.read("msg").object()))) { 107 return ret.get(0); 108 } 109 return null; 110 } 111 112 @Override 113 public Offset commit() { 114 // we write raw: queue, offset, timestamp 115 long offset = tailer.index(); 116 offsetTracker.commit(offset); 117 if (log.isTraceEnabled()) { 118 log.trace(String.format("queue-%02d commit offset: %d", queueIndex, offset)); 119 } 120 return new CQOffset(queueIndex, offset); 121 } 122 123 @Override 124 public void toEnd() { 125 log.debug(String.format("queue-%02d toEnd", queueIndex)); 126 tailer.toEnd(); 127 } 128 129 @Override 130 public void toStart() { 131 log.debug(String.format("queue-%02d toStart", queueIndex)); 132 tailer.toStart(); 133 } 134 135 @Override 136 public void toLastCommitted() { 137 long offset = offsetTracker.getLastCommittedOffset(); 138 if (offset > 0) { 139 log.debug(String.format("queue-%02d toLastCommitted found: %d", queueIndex, offset)); 140 tailer.moveToIndex(offset); 141 } else { 142 log.debug(String.format("queue-%02d toLastCommitted not found, run from beginning", queueIndex)); 143 tailer.toStart(); 144 } 145 } 146 147 @Override 148 public int getQueue() { 149 return queueIndex; 150 } 151 152 @Override 153 public void close() throws Exception { 154 offsetTracker.close(); 155 unregisterTailer(); 156 closed = true; 157 } 158}