001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import java.io.Externalizable; 022import java.io.File; 023import java.io.IOException; 024import java.nio.file.Files; 025import java.nio.file.Path; 026import java.time.Duration; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Objects; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.stream.Stream; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.lib.stream.log.LogOffset; 036import org.nuxeo.lib.stream.log.LogPartition; 037import org.nuxeo.lib.stream.log.LogTailer; 038import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 039import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 040 041import net.openhft.chronicle.queue.ChronicleQueue; 042import net.openhft.chronicle.queue.ExcerptAppender; 043import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 044import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 045 046/** 047 * Chronicle Queue implementation of LogAppender. 048 * 049 * @since 9.3 050 */ 051public class ChronicleLogAppender<M extends Externalizable> implements CloseableLogAppender<M> { 052 private static final Log log = LogFactory.getLog(ChronicleLogAppender.class); 053 054 protected static final String PARTITION_PREFIX = "P-"; 055 056 protected static final int POLL_INTERVAL_MS = 100; 057 058 protected static final int MAX_PARTITIONS = 100; 059 060 protected final List<ChronicleQueue> partitions; 061 062 protected final int nbPartitions; 063 064 protected final File basePath; 065 066 protected final String name; 067 068 // keep track of created tailers to make sure they are closed before the log 069 protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 070 071 protected final ChronicleRetentionDuration retention; 072 073 protected volatile boolean closed; 074 075 protected ChronicleLogAppender(File basePath, int size, ChronicleRetentionDuration retention) { 076 if (size == 0) { 077 // open 078 if (!exists(basePath)) { 079 throw new IllegalArgumentException("Cannot open Chronicle Queues, invalid path: " + basePath); 080 } 081 this.nbPartitions = findNbQueues(basePath); 082 } else { 083 // create 084 if (size > MAX_PARTITIONS) { 085 throw new IllegalArgumentException( 086 String.format("Cannot create more than: %d partitions for log: %s, requested: %d", 087 MAX_PARTITIONS, basePath, size)); 088 } 089 if (exists(basePath)) { 090 throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + basePath); 091 } 092 if (!basePath.exists() && !basePath.mkdirs()) { 093 throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + basePath); 094 } 095 this.nbPartitions = size; 096 } 097 this.name = basePath.getName(); 098 this.basePath = basePath; 099 this.retention = retention; 100 partitions = new ArrayList<>(this.nbPartitions); 101 if (log.isDebugEnabled()) { 102 log.debug(((size == 0) ? "Opening: " : "Creating: ") + toString()); 103 } 104 initPartitions(); 105 } 106 107 protected void initPartitions() { 108 for (int i = 0; i < nbPartitions; i++) { 109 File path = new File(basePath, String.format("%s%02d", PARTITION_PREFIX, i)); 110 if (retention.disable()) { 111 partitions.add(SingleChronicleQueueBuilder.binary(path).build()); 112 } else { 113 ChronicleRetentionListener listener = new ChronicleRetentionListener(retention); 114 SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(path) 115 .rollCycle(retention.getRollCycle()) 116 .storeFileListener(listener) 117 .build(); 118 listener.setQueue(queue); 119 partitions.add(queue); 120 } 121 try { 122 // make sure the directory is created so we can count the partitions 123 Files.createDirectories(path.toPath()); 124 } catch (IOException e) { 125 throw new IllegalArgumentException("Cannot create directory: " + path.getAbsolutePath(), e); 126 } 127 } 128 } 129 130 protected static boolean exists(File basePath) { 131 // noinspection ConstantConditions 132 return basePath.isDirectory() && basePath.list().length > 0; 133 } 134 135 /** 136 * Create a new log 137 */ 138 public static <M extends Externalizable> ChronicleLogAppender<M> create(File basePath, int size, 139 ChronicleRetentionDuration retention) { 140 return new ChronicleLogAppender<>(basePath, size, retention); 141 } 142 143 /** 144 * Create a new log. 145 */ 146 public static <M extends Externalizable> ChronicleLogAppender<M> create(File basePath, int size) { 147 return new ChronicleLogAppender<>(basePath, size, ChronicleRetentionDuration.DISABLE); 148 } 149 150 /** 151 * Open an existing log. 152 */ 153 public static <M extends Externalizable> ChronicleLogAppender<M> open(File basePath) { 154 return new ChronicleLogAppender<>(basePath, 0, ChronicleRetentionDuration.DISABLE); 155 } 156 157 /** 158 * Open an existing log. 159 */ 160 public static <M extends Externalizable> ChronicleLogAppender<M> open(File basePath, 161 ChronicleRetentionDuration retention) { 162 return new ChronicleLogAppender<>(basePath, 0, retention); 163 } 164 165 public String getBasePath() { 166 return basePath.getPath(); 167 } 168 169 @Override 170 public String name() { 171 return name; 172 } 173 174 @Override 175 public int size() { 176 return nbPartitions; 177 } 178 179 @Override 180 public LogOffset append(int partition, M message) { 181 ExcerptAppender appender = partitions.get(partition).acquireAppender(); 182 appender.writeDocument(w -> w.write("msg").object(message)); 183 long offset = appender.lastIndexAppended(); 184 LogOffset ret = new LogOffsetImpl(name, partition, offset); 185 if (log.isDebugEnabled()) { 186 log.debug(String.format("append to %s, value: %s", ret, message)); 187 } 188 return ret; 189 } 190 191 public LogTailer<M> createTailer(LogPartition partition, String group) { 192 return addTailer(new ChronicleLogTailer<>(basePath.toString(), 193 partitions.get(partition.partition()).createTailer(), partition, group, retention)); 194 } 195 196 public long endOffset(int partition) { 197 return partitions.get(partition).createTailer().toEnd().index(); 198 } 199 200 public long firstOffset(int partition) { 201 long ret = partitions.get(partition).firstIndex(); 202 if (ret == Long.MAX_VALUE) { 203 return 0; 204 } 205 return ret; 206 } 207 208 public long countMessages(int partition, long lowerOffset, long upperOffset) { 209 long ret; 210 SingleChronicleQueue queue = (SingleChronicleQueue) partitions.get(partition); 211 try { 212 ret = queue.countExcerpts(lowerOffset, upperOffset); 213 } catch (IllegalStateException e) { 214 if (log.isDebugEnabled()) { 215 log.debug("Missing low cycle file: " + lowerOffset + " for queue: " + queue + " " + e.getMessage()); 216 } 217 return 0; 218 } 219 // System.out.println("partition: " + partition + ", count from " + lowerOffset + " to " + upperOffset + " = " + 220 // ret); 221 return ret; 222 } 223 224 protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer) { 225 tailers.add(tailer); 226 return tailer; 227 } 228 229 @Override 230 public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException { 231 boolean ret; 232 long offsetPosition = offset.offset(); 233 int partition = offset.partition().partition(); 234 try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(basePath.toString(), partition, 235 group)) { 236 ret = isProcessed(offsetTracker, offsetPosition); 237 if (ret) { 238 return true; 239 } 240 final long timeoutMs = timeout.toMillis(); 241 final long deadline = System.currentTimeMillis() + timeoutMs; 242 final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 243 while (!ret && System.currentTimeMillis() < deadline) { 244 Thread.sleep(delay); 245 ret = isProcessed(offsetTracker, offsetPosition); 246 } 247 } 248 return ret; 249 } 250 251 @Override 252 public boolean closed() { 253 return closed; 254 } 255 256 protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset) { 257 long last = tracker.readLastCommittedOffset(); 258 return last > 0 && last >= offset; 259 } 260 261 @Override 262 public void close() { 263 log.debug("Closing: " + toString()); 264 tailers.stream().filter(Objects::nonNull).forEach(ChronicleLogTailer::close); 265 tailers.clear(); 266 partitions.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close); 267 partitions.clear(); 268 closed = true; 269 } 270 271 protected int findNbQueues(File basePath) { 272 int ret; 273 try (Stream<Path> paths = Files.list(basePath.toPath())) { 274 ret = (int) paths.filter( 275 path -> (Files.isDirectory(path) && path.getFileName().toString().startsWith(PARTITION_PREFIX))) 276 .count(); 277 if (ret == 0) { 278 throw new IOException("No chronicles queues file found"); 279 } 280 } catch (IOException e) { 281 throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e); 282 } 283 return ret; 284 } 285 286 @Override 287 public String toString() { 288 return "ChronicleLogAppender{" + "nbPartitions=" + nbPartitions + ", basePath=" + basePath + ", name='" + name 289 + '\'' + ", retention=" + retention + ", closed=" + closed + '}'; 290 } 291 292 public ChronicleRetentionDuration getRetention() { 293 return retention; 294 } 295}