001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle; 020 021import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset; 022import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord; 024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer; 025 026import java.io.Externalizable; 027import java.time.Duration; 028import java.util.ArrayList; 029import java.util.Collection; 030import java.util.List; 031 032 033/** 034 * @since 9.2 035 */ 036public class ChronicleCompoundMQTailer<M extends Externalizable> implements MQTailer<M> { 037 private final List<ChronicleMQTailer<M>> tailers = new ArrayList<>(); 038 private final String group; 039 private final int size; 040 private final List<MQPartition> mqPartitions = new ArrayList<>(); 041 private boolean closed = false; 042 private long counter = 0; 043 044 public ChronicleCompoundMQTailer(Collection<ChronicleMQTailer<M>> tailers, String group) { 045 // empty tailers is an accepted input 046 this.tailers.addAll(tailers); 047 this.group = group; 048 this.size = tailers.size(); 049 tailers.forEach(partition -> mqPartitions.addAll(partition.assignments())); 050 } 051 052 @Override 053 public MQRecord<M> read(Duration timeout) throws InterruptedException { 054 MQRecord<M> ret = read(); 055 if (ret != null) { 056 return ret; 057 } 058 final long timeoutMs = timeout.toMillis(); 059 final long deadline = System.currentTimeMillis() + timeoutMs; 060 final long delay = Math.min(ChronicleMQTailer.POLL_INTERVAL_MS, timeoutMs); 061 while (ret == null && System.currentTimeMillis() < deadline) { 062 Thread.sleep(delay); 063 ret = read(); 064 } 065 return ret; 066 } 067 068 private MQRecord<M> read() { 069 if (size <= 0) { 070 return null; 071 } 072 // round robin on tailers 073 MQRecord<M> ret; 074 long end = counter + size; 075 do { 076 counter++; 077 int i = (int) counter % size; 078 ret = tailers.get(i).read(); 079 if (ret != null) { 080 return ret; 081 } 082 } while (counter < end); 083 return null; 084 } 085 086 @Override 087 public MQOffset commit(MQPartition partition) { 088 for (MQTailer<M> tailer : tailers) { 089 if (tailer.assignments().contains(partition)) { 090 return tailer.commit(partition); 091 } 092 } 093 throw new IllegalArgumentException("No tailer matching: " + partition); 094 } 095 096 @Override 097 public void commit() { 098 tailers.forEach(MQTailer::commit); 099 } 100 101 @Override 102 public void toEnd() { 103 tailers.forEach(ChronicleMQTailer::toEnd); 104 } 105 106 @Override 107 public void toStart() { 108 tailers.forEach(ChronicleMQTailer::toStart); 109 } 110 111 @Override 112 public void toLastCommitted() { 113 tailers.forEach(ChronicleMQTailer::toLastCommitted); 114 } 115 116 @Override 117 public Collection<MQPartition> assignments() { 118 return mqPartitions; 119 } 120 121 @Override 122 public String group() { 123 return group; 124 } 125 126 @Override 127 public boolean closed() { 128 return closed; 129 } 130 131 public void seek(MQPartition partition, MQOffset offset) { 132 for (MQTailer<M> tailer : tailers) { 133 if (tailer.assignments().contains(partition)) { 134 ((ChronicleMQTailer<M>) tailer).seek(partition, offset); 135 return; 136 } 137 } 138 } 139 140 @Override 141 public void close() throws Exception { 142 for (ChronicleMQTailer<M> tailer : tailers) { 143 tailer.close(); 144 } 145 closed = true; 146 } 147}