001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.io.File; 025import java.io.IOException; 026import java.io.InputStream; 027import java.nio.file.Files; 028import java.nio.file.Path; 029import java.nio.file.Paths; 030import java.nio.file.StandardOpenOption; 031import java.time.Duration; 032import java.time.Instant; 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Objects; 036import java.util.Properties; 037import java.util.concurrent.ConcurrentLinkedQueue; 038import java.util.stream.Stream; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.nuxeo.lib.stream.codec.Codec; 043import org.nuxeo.lib.stream.log.LogOffset; 044import org.nuxeo.lib.stream.log.LogPartition; 045import org.nuxeo.lib.stream.log.LogTailer; 046import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 047import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 048 049import net.openhft.chronicle.queue.ChronicleQueue; 050import net.openhft.chronicle.queue.ExcerptAppender; 051import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 052import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 053 054/** 055 * Chronicle Queue implementation of LogAppender. 056 * 057 * @since 9.3 058 */ 059public class ChronicleLogAppender<M extends Externalizable> implements CloseableLogAppender<M> { 060 private static final Log log = LogFactory.getLog(ChronicleLogAppender.class); 061 062 protected static final String PARTITION_PREFIX = "P-"; 063 064 protected static final String METADATA_FILE = "metadata.properties"; 065 066 protected static final int POLL_INTERVAL_MS = 100; 067 068 protected static final int MAX_PARTITIONS = 100; 069 070 public static final String MSG_KEY = "msg"; 071 072 // The block size determines the initial cq4 spare file size and the maximum message size. 073 // A 4M block size creates a 5M file and enable a 1MB message 074 public static final int CQ_BLOCK_SIZE = 4_194_304; 075 076 public static final String RETENTION_KEY = "retention"; 077 078 public static final String PARTITIONS_KEY = "partitions"; 079 080 public static final String BLOCK_SIZE_KEY = "blockSize"; 081 082 protected final List<ChronicleQueue> partitions; 083 084 protected final int nbPartitions; 085 086 protected final File basePath; 087 088 protected final int blockSize; 089 090 protected final String name; 091 092 // keep track of created tailers to make sure they are closed before the log 093 protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 094 095 protected final ChronicleRetentionDuration retention; 096 097 protected final Codec<M> codec; 098 099 protected volatile boolean closed; 100 101 /** 102 * Open an existing Log 103 */ 104 protected ChronicleLogAppender(Codec<M> codec, File basePath, ChronicleRetentionDuration retention) { 105 if (!exists(basePath)) { 106 throw new IllegalArgumentException("Cannot open Chronicle Queues, invalid path: " + basePath); 107 } 108 if (log.isDebugEnabled()) { 109 log.debug("Opening: " + toString()); 110 } 111 Objects.requireNonNull(codec); 112 this.codec = codec; 113 this.basePath = basePath; 114 this.name = basePath.getName(); 115 116 Path metadataPath = getMetadataPath(); 117 Properties metadata; 118 if (metadataPath.toFile().exists()) { 119 metadata = readMetadata(getMetadataPath()); 120 } else { 121 // backward compatibility 122 metadata = guessMetadata(retention); 123 } 124 ChronicleRetentionDuration storedRetention = new ChronicleRetentionDuration( 125 metadata.getProperty(RETENTION_KEY)); 126 if (retention.disable()) { 127 this.retention = ChronicleRetentionDuration.disableOf(storedRetention); 128 } else if (retention.getRollCycle() == storedRetention.getRollCycle()) { 129 this.retention = retention; 130 } else { 131 // we can change the number of retention cycles but not the roll cycle 132 throw new IllegalArgumentException(String.format("Cannot open Log %s: expecting retention: %s got: %s", 133 name, storedRetention, retention)); 134 } 135 this.nbPartitions = Integer.parseInt(metadata.getProperty(PARTITIONS_KEY)); 136 this.blockSize = Integer.parseInt(metadata.getProperty(BLOCK_SIZE_KEY)); 137 this.partitions = new ArrayList<>(nbPartitions); 138 initPartitions(false); 139 } 140 141 /** 142 * Create a new Log 143 */ 144 protected ChronicleLogAppender(Codec<M> codec, File basePath, int size, ChronicleRetentionDuration retention) { 145 if (size <= 0) { 146 throw new IllegalArgumentException("Number of partitions must be > 0"); 147 } 148 if (size > MAX_PARTITIONS) { 149 throw new IllegalArgumentException( 150 String.format("Cannot create more than: %d partitions for log: %s, requested: %d", MAX_PARTITIONS, 151 basePath, size)); 152 } 153 if (exists(basePath)) { 154 throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + basePath); 155 } 156 if (!basePath.exists() && !basePath.mkdirs()) { 157 throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + basePath); 158 } 159 Objects.requireNonNull(codec); 160 this.nbPartitions = size; 161 this.codec = codec; 162 this.name = basePath.getName(); 163 this.basePath = basePath; 164 this.retention = retention; 165 this.partitions = new ArrayList<>(nbPartitions); 166 this.blockSize = CQ_BLOCK_SIZE; 167 if (log.isDebugEnabled()) { 168 log.debug("Creating: " + toString()); 169 } 170 initPartitions(true); 171 saveMetadata(); 172 } 173 174 protected void initPartitions(boolean create) { 175 for (int i = 0; i < nbPartitions; i++) { 176 Path partitionPath = Paths.get(getBasePath(), String.format("%s%02d", PARTITION_PREFIX, i)); 177 if (create) { 178 try { 179 Files.createDirectories(partitionPath); 180 } catch (IOException e) { 181 throw new IllegalArgumentException("Cannot create directory: " + partitionPath.toAbsolutePath(), e); 182 } 183 } 184 ChronicleRetentionListener listener = null; 185 SingleChronicleQueueBuilder builder = SingleChronicleQueueBuilder.binary(partitionPath) 186 .rollCycle(retention.getRollCycle()) 187 .blockSize(blockSize); 188 if (!retention.disable()) { 189 listener = new ChronicleRetentionListener(retention); 190 builder.storeFileListener(listener); 191 } 192 SingleChronicleQueue queue = builder.build(); 193 // we don't try to acquire an appender and pretouch because it causes troubles with countExcerpts 194 // the cq4 cycle file will be created on first append 195 partitions.add(queue); 196 if (listener != null) { 197 listener.setQueue(queue); 198 } 199 } 200 } 201 202 protected void saveMetadata() { 203 Path metadata = getMetadataPath(); 204 StringBuilder builder = new StringBuilder(); 205 builder.append(String.format("# Log created %s%n", Instant.now().toString())); 206 builder.append(String.format("%s=%d%n", PARTITIONS_KEY, nbPartitions)); 207 builder.append(String.format("%s=%s%n", RETENTION_KEY, retention)); 208 builder.append(String.format("%s=%d%n", BLOCK_SIZE_KEY, blockSize)); 209 try { 210 Files.write(metadata, builder.toString().getBytes(), StandardOpenOption.CREATE_NEW); 211 } catch (IOException e) { 212 throw new IllegalArgumentException("Unable to create metadata file: " + metadata, e); 213 } 214 if (log.isDebugEnabled()) { 215 log.debug(String.format("Created Log: %s%n%s", name, builder.toString()), new Throwable("here")); 216 } 217 } 218 219 protected Path getMetadataPath() { 220 return basePath.toPath().resolve(METADATA_FILE); 221 } 222 223 protected static Properties readMetadata(Path file) { 224 Properties props = new Properties(); 225 try (InputStream stream = Files.newInputStream(file)) { 226 props.load(stream); 227 } catch (IOException e) { 228 throw new IllegalArgumentException("Cannot open Log metadata file: " + file, e); 229 } 230 return props; 231 } 232 233 protected Properties guessMetadata(ChronicleRetentionDuration retention) { 234 Properties props = new Properties(); 235 props.setProperty(PARTITIONS_KEY, Integer.toString(discoverPartitions(basePath.toPath()))); 236 props.setProperty(RETENTION_KEY, retention.getRetention()); 237 props.setProperty(BLOCK_SIZE_KEY, Integer.toString(CQ_BLOCK_SIZE)); 238 return props; 239 } 240 241 protected static boolean exists(File basePath) { 242 // noinspection ConstantConditions 243 return basePath.isDirectory() && basePath.list().length > 0; 244 } 245 246 /** 247 * Create a new log 248 */ 249 public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File basePath, int size, 250 ChronicleRetentionDuration retention) { 251 return new ChronicleLogAppender<>(codec, basePath, size, retention); 252 } 253 254 /** 255 * Create a new log. 256 */ 257 public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File basePath, int size) { 258 return new ChronicleLogAppender<>(codec, basePath, size, ChronicleRetentionDuration.NONE); 259 } 260 261 /** 262 * Open an existing log. 263 */ 264 public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File basePath) { 265 return new ChronicleLogAppender<>(codec, basePath, ChronicleRetentionDuration.NONE); 266 } 267 268 /** 269 * Open an existing log. 270 */ 271 public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File basePath, 272 ChronicleRetentionDuration retention) { 273 return new ChronicleLogAppender<>(codec, basePath, retention); 274 } 275 276 public String getBasePath() { 277 return basePath.getPath(); 278 } 279 280 @Override 281 public String name() { 282 return name; 283 } 284 285 @Override 286 public int size() { 287 return nbPartitions; 288 } 289 290 @Override 291 public LogOffset append(int partition, M message) { 292 ExcerptAppender appender = partitions.get(partition).acquireAppender(); 293 if (NO_CODEC.equals(codec)) { 294 // default format for backward compatibility 295 appender.writeDocument(w -> w.write(MSG_KEY).object(message)); 296 } else { 297 appender.writeDocument(w -> w.write().bytes(codec.encode(message))); 298 } 299 long offset = appender.lastIndexAppended(); 300 LogOffset ret = new LogOffsetImpl(name, partition, offset); 301 if (log.isDebugEnabled()) { 302 log.debug(String.format("append to %s, value: %s", ret, message)); 303 } 304 return ret; 305 } 306 307 public LogTailer<M> createTailer(LogPartition partition, String group, Codec<M> codec) { 308 return addTailer(new ChronicleLogTailer<>(codec, basePath.toString(), 309 partitions.get(partition.partition()).createTailer(), partition, group, retention)); 310 } 311 312 public long endOffset(int partition) { 313 return partitions.get(partition).createTailer().toEnd().index(); 314 } 315 316 public long firstOffset(int partition) { 317 long ret = partitions.get(partition).firstIndex(); 318 if (ret == Long.MAX_VALUE) { 319 return 0; 320 } 321 return ret; 322 } 323 324 public long countMessages(int partition, long lowerOffset, long upperOffset) { 325 long ret; 326 SingleChronicleQueue queue = (SingleChronicleQueue) partitions.get(partition); 327 try { 328 ret = queue.countExcerpts(lowerOffset, upperOffset); 329 } catch (IllegalStateException e) { 330 if (log.isDebugEnabled()) { 331 log.debug("Missing low cycle file: " + lowerOffset + " for queue: " + queue + " " + e.getMessage()); 332 } 333 return 0; 334 } 335 return ret; 336 } 337 338 protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer) { 339 tailers.add(tailer); 340 return tailer; 341 } 342 343 @Override 344 public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException { 345 boolean ret; 346 long offsetPosition = offset.offset(); 347 int partition = offset.partition().partition(); 348 try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(basePath.toString(), partition, 349 group, ChronicleRetentionDuration.disableOf(retention))) { 350 ret = isProcessed(offsetTracker, offsetPosition); 351 if (ret) { 352 return true; 353 } 354 final long timeoutMs = timeout.toMillis(); 355 final long deadline = System.currentTimeMillis() + timeoutMs; 356 final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 357 while (!ret && System.currentTimeMillis() < deadline) { 358 Thread.sleep(delay); 359 ret = isProcessed(offsetTracker, offsetPosition); 360 } 361 } 362 return ret; 363 } 364 365 @Override 366 public boolean closed() { 367 return closed; 368 } 369 370 @Override 371 public Codec<M> getCodec() { 372 return codec; 373 } 374 375 protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset) { 376 long last = tracker.readLastCommittedOffset(); 377 return last > 0 && last >= offset; 378 } 379 380 @Override 381 public void close() { 382 log.debug("Closing: " + toString()); 383 tailers.stream().filter(Objects::nonNull).forEach(ChronicleLogTailer::close); 384 tailers.clear(); 385 partitions.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close); 386 partitions.clear(); 387 closed = true; 388 } 389 390 public static int partitions(Path basePath) { 391 Path metadataPath = basePath.resolve(METADATA_FILE); 392 if (metadataPath.toFile().exists()) { 393 return Integer.parseInt(readMetadata(metadataPath).getProperty(PARTITIONS_KEY)); 394 } 395 // backward compatibility before metadata file 396 return discoverPartitions(basePath); 397 } 398 399 public static int discoverPartitions(Path basePath) { 400 try (Stream<Path> paths = Files.list(basePath)) { 401 int ret = (int) paths.filter(ChronicleLogAppender::isPartitionDirectory).count(); 402 if (ret == 0) { 403 throw new IOException("No chronicles queues file found"); 404 } 405 return ret; 406 } catch (IOException e) { 407 throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e); 408 } 409 } 410 411 protected static boolean isPartitionDirectory(Path path) { 412 return path.toFile().isDirectory() && path.getFileName().toString().startsWith(PARTITION_PREFIX); 413 } 414 415 @Override 416 public String toString() { 417 return "ChronicleLogAppender{" + "nbPartitions=" + nbPartitions + ", basePath=" + basePath + ", name='" + name 418 + '\'' + ", retention=" + retention + ", closed=" + closed + ", codec=" + codec + '}'; 419 } 420 421 public ChronicleRetentionDuration getRetention() { 422 return retention; 423 } 424}