001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.io.File; 025import java.io.IOException; 026import java.io.InputStream; 027import java.nio.file.Files; 028import java.nio.file.Path; 029import java.nio.file.StandardOpenOption; 030import java.time.Duration; 031import java.time.Instant; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Objects; 035import java.util.Properties; 036import java.util.concurrent.ConcurrentLinkedQueue; 037import java.util.stream.Stream; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.lib.stream.StreamRuntimeException; 042import org.nuxeo.lib.stream.codec.Codec; 043import org.nuxeo.lib.stream.log.LogOffset; 044import org.nuxeo.lib.stream.log.LogPartition; 045import org.nuxeo.lib.stream.log.LogTailer; 046import org.nuxeo.lib.stream.log.Name; 047import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 048import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 049 050import net.openhft.chronicle.bytes.util.DecoratedBufferOverflowException; 051import net.openhft.chronicle.queue.ChronicleQueue; 052import net.openhft.chronicle.queue.ExcerptAppender; 053import net.openhft.chronicle.queue.ExcerptTailer; 054import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 055import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 056 057/** 058 * Chronicle Queue implementation of LogAppender. 059 * 060 * @since 9.3 061 */ 062public class ChronicleLogAppender<M extends Externalizable> implements CloseableLogAppender<M> { 063 private static final Log log = LogFactory.getLog(ChronicleLogAppender.class); 064 065 protected static final String PARTITION_PREFIX = "P-"; 066 067 protected static final String METADATA_FILE = "metadata.properties"; 068 069 protected static final int POLL_INTERVAL_MS = 100; 070 071 protected static final int MAX_PARTITIONS = 100; 072 073 public static final String MSG_KEY = "msg"; 074 075 // The block size determines the initial cq4 spare file size and the maximum message size. 076 // A 4M block size creates a 5M file and enable a 1MB message 077 public static final int CQ_BLOCK_SIZE = 4_194_304; 078 079 public static final String RETENTION_KEY = "retention"; 080 081 public static final String PARTITIONS_KEY = "partitions"; 082 083 public static final String BLOCK_SIZE_KEY = "blockSize"; 084 085 protected final List<ChronicleQueue> partitions; 086 087 protected final int nbPartitions; 088 089 protected final File basePath; 090 091 protected final int blockSize; 092 093 protected final Name name; 094 095 // keep track of created tailers to make sure they are closed before the log 096 protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 097 098 protected final ChronicleRetentionDuration retention; 099 100 protected final Codec<M> codec; 101 102 protected volatile boolean closed; 103 104 /** 105 * Open an existing Log 106 */ 107 protected ChronicleLogAppender(ChronicleLogConfig config, Name name, Codec<M> codec, boolean withRetention) { 108 basePath = config.getBasePath().resolve(name.getId()).toFile(); 109 if (!exists(basePath)) { 110 throw new IllegalArgumentException("Cannot open Chronicle Log, invalid path: " + basePath); 111 } 112 if (log.isDebugEnabled()) { 113 log.debug("Opening: " + toString()); 114 } 115 Objects.requireNonNull(codec); 116 this.codec = codec; 117 118 this.name = name; 119 120 Path metadataPath = getMetadataPath(); 121 122 if (!metadataPath.toFile().exists()) { 123 throw new IllegalArgumentException( 124 String.format("Cannot open Log %s: no metadata file %s", this.name, metadataPath)); 125 } 126 Properties metadata = readMetadata(getMetadataPath()); 127 if (!withRetention) { 128 this.retention = ChronicleRetentionDuration.disableOf(config.getRetention()); 129 } else { 130 ChronicleRetentionDuration storedRetention = new ChronicleRetentionDuration( 131 metadata.getProperty(RETENTION_KEY)); 132 if (config.getRetention().getRollCycle() != storedRetention.getRollCycle()) { 133 // we can change the number of retention cycles but not the roll cycle 134 throw new IllegalArgumentException(String.format("Cannot open Log %s: expecting retention: %s got: %s", 135 this.name, storedRetention, config.getRetention())); 136 } 137 this.retention = config.getRetention(); 138 } 139 this.nbPartitions = Integer.parseInt(metadata.getProperty(PARTITIONS_KEY)); 140 this.blockSize = Integer.parseInt(metadata.getProperty(BLOCK_SIZE_KEY)); 141 this.partitions = new ArrayList<>(nbPartitions); 142 initPartitions(false); 143 } 144 145 /** 146 * Create a new Log 147 */ 148 protected ChronicleLogAppender(ChronicleLogConfig config, Name name, int size, Codec<M> codec) { 149 if (size <= 0) { 150 throw new IllegalArgumentException("Number of partitions must be > 0"); 151 } 152 basePath = config.getBasePath().resolve(name.getId()).toFile(); 153 if (size > MAX_PARTITIONS) { 154 throw new IllegalArgumentException( 155 String.format("Cannot create more than: %d partitions for log: %s, requested: %d", MAX_PARTITIONS, 156 basePath, size)); 157 } 158 if (exists(basePath)) { 159 throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + basePath); 160 } 161 if (!basePath.exists() && !basePath.mkdirs()) { 162 throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + basePath); 163 } 164 Objects.requireNonNull(codec); 165 this.nbPartitions = size; 166 this.codec = codec; 167 this.retention = config.getRetention(); 168 this.partitions = new ArrayList<>(nbPartitions); 169 this.name = name; 170 this.blockSize = CQ_BLOCK_SIZE; 171 if (log.isDebugEnabled()) { 172 log.debug("Creating: " + toString()); 173 } 174 initPartitions(true); 175 saveMetadata(); 176 } 177 178 protected void initPartitions(boolean create) { 179 for (int i = 0; i < nbPartitions; i++) { 180 Path partitionPath = basePath.toPath().resolve(String.format("%s%02d", PARTITION_PREFIX, i)); 181 if (create) { 182 try { 183 Files.createDirectories(partitionPath); 184 } catch (IOException e) { 185 throw new IllegalArgumentException("Cannot create directory: " + partitionPath.toAbsolutePath(), e); 186 } 187 } 188 ChronicleRetentionListener listener = null; 189 SingleChronicleQueueBuilder builder = SingleChronicleQueueBuilder.binary(partitionPath) 190 .rollCycle(retention.getRollCycle()) 191 .blockSize(blockSize); 192 if (!retention.disable()) { 193 listener = new ChronicleRetentionListener(retention); 194 builder.storeFileListener(listener); 195 } 196 SingleChronicleQueue queue = builder.build(); 197 // we don't try to acquire an appender and pretouch because it causes troubles with countExcerpts 198 // the cq4 cycle file will be created on first append 199 partitions.add(queue); 200 if (listener != null) { 201 listener.setQueue(queue); 202 } 203 } 204 } 205 206 protected void saveMetadata() { 207 Path metadata = getMetadataPath(); 208 StringBuilder builder = new StringBuilder(); 209 builder.append(String.format("# Log created %s%n", Instant.now().toString())); 210 builder.append(String.format("%s=%d%n", PARTITIONS_KEY, nbPartitions)); 211 builder.append(String.format("%s=%s%n", RETENTION_KEY, retention)); 212 builder.append(String.format("%s=%d%n", BLOCK_SIZE_KEY, blockSize)); 213 try { 214 Files.write(metadata, builder.toString().getBytes(), StandardOpenOption.CREATE_NEW); 215 } catch (IOException e) { 216 throw new IllegalArgumentException("Unable to create metadata file: " + metadata, e); 217 } 218 if (log.isDebugEnabled()) { 219 log.debug(String.format("Created Log: %s%n%s", name, builder.toString()), new Throwable("here")); 220 } 221 } 222 223 protected Path getMetadataPath() { 224 return basePath.toPath().resolve(METADATA_FILE); 225 } 226 227 protected static Properties readMetadata(Path file) { 228 Properties props = new Properties(); 229 try (InputStream stream = Files.newInputStream(file)) { 230 props.load(stream); 231 } catch (IOException e) { 232 throw new IllegalArgumentException("Cannot open Log metadata file: " + file, e); 233 } 234 return props; 235 } 236 237 protected static boolean exists(File basePath) { 238 // noinspection ConstantConditions 239 return basePath.isDirectory() && basePath.list().length > 0; 240 } 241 242 /** 243 * Create a new log 244 */ 245 public static <M extends Externalizable> ChronicleLogAppender<M> create(ChronicleLogConfig config, Name name, 246 int size, Codec<M> codec) { 247 return new ChronicleLogAppender<>(config, name, size, codec); 248 } 249 250 /** 251 * Open an existing log. 252 */ 253 public static <M extends Externalizable> ChronicleLogAppender<M> open(ChronicleLogConfig config, Name name, 254 Codec<M> codec) { 255 return new ChronicleLogAppender<>(config, name, codec, true); 256 } 257 258 public static <M extends Externalizable> ChronicleLogAppender<M> openWithoutRetention(ChronicleLogConfig config, 259 Name name, Codec<M> codec) { 260 return new ChronicleLogAppender<>(config, name, codec, false); 261 } 262 263 public String getBasePath() { 264 return basePath.getPath(); 265 } 266 267 @Override 268 public Name name() { 269 return name; 270 } 271 272 @Override 273 public int size() { 274 return nbPartitions; 275 } 276 277 @Override 278 public LogOffset append(int partition, M message) { 279 ExcerptAppender appender = partitions.get(partition).acquireAppender(); 280 try { 281 if (NO_CODEC.equals(codec)) { 282 // default format for backward compatibility 283 appender.writeDocument(w -> w.write(MSG_KEY).object(message)); 284 } else { 285 appender.writeDocument(w -> w.write().bytes(codec.encode(message))); 286 } 287 } catch (DecoratedBufferOverflowException e) { 288 throw new StreamRuntimeException(e); 289 } 290 long offset = appender.lastIndexAppended(); 291 LogOffset ret = new LogOffsetImpl(name, partition, offset); 292 if (log.isDebugEnabled()) { 293 log.debug(String.format("append to %s, value: %s", ret, message)); 294 } 295 return ret; 296 } 297 298 public LogTailer<M> createTailer(LogPartition partition, Name group, Codec<M> codec) { 299 return addTailer(new ChronicleLogTailer<>(codec, basePath.toString(), 300 partitions.get(partition.partition()).createTailer(), partition, group, retention)); 301 } 302 303 public long endOffset(int partition) { 304 try (ExcerptTailer tailer = partitions.get(partition).createTailer().toEnd()) { 305 return tailer.index(); 306 } 307 } 308 309 public long firstOffset(int partition) { 310 long ret = partitions.get(partition).firstIndex(); 311 if (ret == Long.MAX_VALUE) { 312 return 0; 313 } 314 return ret; 315 } 316 317 public long countMessages(int partition, long lowerOffset, long upperOffset) { 318 long ret; 319 SingleChronicleQueue queue = (SingleChronicleQueue) partitions.get(partition); 320 try { 321 ret = queue.countExcerpts(lowerOffset, upperOffset); 322 } catch (IllegalStateException e) { 323 if (log.isDebugEnabled()) { 324 log.debug("Missing low cycle file: " + lowerOffset + " for queue: " + queue + " " + e.getMessage()); 325 } 326 return 0; 327 } 328 return ret; 329 } 330 331 protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer) { 332 tailers.add(tailer); 333 return tailer; 334 } 335 336 @Override 337 public boolean waitFor(LogOffset offset, Name group, Duration timeout) throws InterruptedException { 338 boolean ret; 339 long offsetPosition = offset.offset(); 340 int partition = offset.partition().partition(); 341 try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(basePath.toString(), partition, 342 group, ChronicleRetentionDuration.disableOf(retention))) { 343 ret = isProcessed(offsetTracker, offsetPosition); 344 if (ret) { 345 return true; 346 } 347 final long timeoutMs = timeout.toMillis(); 348 final long deadline = System.currentTimeMillis() + timeoutMs; 349 final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 350 while (!ret && System.currentTimeMillis() < deadline) { 351 Thread.sleep(delay); 352 ret = isProcessed(offsetTracker, offsetPosition); 353 } 354 } 355 return ret; 356 } 357 358 @Override 359 public boolean closed() { 360 return closed; 361 } 362 363 @Override 364 public Codec<M> getCodec() { 365 return codec; 366 } 367 368 protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset) { 369 long last = tracker.readLastCommittedOffset(); 370 return last > 0 && last >= offset; 371 } 372 373 @Override 374 public void close() { 375 log.debug("Closing: " + toString()); 376 tailers.stream().filter(Objects::nonNull).forEach(ChronicleLogTailer::close); 377 tailers.clear(); 378 partitions.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close); 379 partitions.clear(); 380 closed = true; 381 } 382 383 public static int partitions(Path basePath) { 384 Path metadataPath = basePath.resolve(METADATA_FILE); 385 if (!metadataPath.toFile().exists()) { 386 throw new IllegalArgumentException("No CQ file on " + basePath); 387 } 388 return Integer.parseInt(readMetadata(metadataPath).getProperty(PARTITIONS_KEY)); 389 } 390 391 public static int discoverPartitions(Path basePath) { 392 try (Stream<Path> paths = Files.list(basePath)) { 393 int ret = (int) paths.filter(ChronicleLogAppender::isPartitionDirectory).count(); 394 if (ret == 0) { 395 throw new IOException("No chronicles queues file found"); 396 } 397 return ret; 398 } catch (IOException e) { 399 throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e); 400 } 401 } 402 403 protected static boolean isPartitionDirectory(Path path) { 404 return path.toFile().isDirectory() && path.getFileName().toString().startsWith(PARTITION_PREFIX); 405 } 406 407 @Override 408 public String toString() { 409 return "ChronicleLogAppender{" + "nbPartitions=" + nbPartitions + ", basePath=" + basePath + ", name='" + name 410 + '\'' + ", retention=" + retention + ", closed=" + closed + ", codec=" + codec + '}'; 411 } 412 413 public ChronicleRetentionDuration getRetention() { 414 return retention; 415 } 416}