001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.io.File;
025import java.io.IOException;
026import java.io.InputStream;
027import java.nio.file.Files;
028import java.nio.file.Path;
029import java.nio.file.Paths;
030import java.nio.file.StandardOpenOption;
031import java.time.Duration;
032import java.time.Instant;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Objects;
036import java.util.Properties;
037import java.util.concurrent.ConcurrentLinkedQueue;
038import java.util.stream.Stream;
039
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.nuxeo.lib.stream.codec.Codec;
043import org.nuxeo.lib.stream.log.LogOffset;
044import org.nuxeo.lib.stream.log.LogPartition;
045import org.nuxeo.lib.stream.log.LogTailer;
046import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
047import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
048
049import net.openhft.chronicle.queue.ChronicleQueue;
050import net.openhft.chronicle.queue.ExcerptAppender;
051import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
052import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
053
054/**
055 * Chronicle Queue implementation of LogAppender.
056 *
057 * @since 9.3
058 */
059public class ChronicleLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
060    private static final Log log = LogFactory.getLog(ChronicleLogAppender.class);
061
062    protected static final String PARTITION_PREFIX = "P-";
063
064    protected static final String METADATA_FILE = "metadata.properties";
065
066    protected static final int POLL_INTERVAL_MS = 100;
067
068    protected static final int MAX_PARTITIONS = 100;
069
070    public static final String MSG_KEY = "msg";
071
072    // The block size determines the initial cq4 spare file size and the maximum message size.
073    // A 4M block size creates a 5M file and enable a 1MB message
074    public static final int CQ_BLOCK_SIZE = 4_194_304;
075
076    public static final String RETENTION_KEY = "retention";
077
078    public static final String PARTITIONS_KEY = "partitions";
079
080    public static final String BLOCK_SIZE_KEY = "blockSize";
081
082    protected final List<ChronicleQueue> partitions;
083
084    protected final int nbPartitions;
085
086    protected final File basePath;
087
088    protected final int blockSize;
089
090    protected final String name;
091
092    // keep track of created tailers to make sure they are closed before the log
093    protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
094
095    protected final ChronicleRetentionDuration retention;
096
097    protected final Codec<M> codec;
098
099    protected volatile boolean closed;
100
101    /**
102     * Open an existing Log
103     */
104    protected ChronicleLogAppender(Codec<M> codec, File basePath, ChronicleRetentionDuration retention) {
105        if (!exists(basePath)) {
106            throw new IllegalArgumentException("Cannot open Chronicle Queues, invalid path: " + basePath);
107        }
108        if (log.isDebugEnabled()) {
109            log.debug("Opening: " + toString());
110        }
111        Objects.requireNonNull(codec);
112        this.codec = codec;
113        this.basePath = basePath;
114        this.name = basePath.getName();
115
116        Path metadataPath = getMetadataPath();
117        Properties metadata;
118        if (metadataPath.toFile().exists()) {
119            metadata = readMetadata(getMetadataPath());
120        } else {
121            // backward compatibility
122            metadata = guessMetadata(retention);
123        }
124        ChronicleRetentionDuration storedRetention = new ChronicleRetentionDuration(
125                metadata.getProperty(RETENTION_KEY));
126        if (retention.disable()) {
127            this.retention = ChronicleRetentionDuration.disableOf(storedRetention);
128        } else if (retention.getRollCycle() == storedRetention.getRollCycle()) {
129            this.retention = retention;
130        } else {
131            // we can change the number of retention cycles but not the roll cycle
132            throw new IllegalArgumentException(String.format("Cannot open Log %s: expecting retention: %s got: %s",
133                    name, storedRetention, retention));
134        }
135        this.nbPartitions = Integer.parseInt(metadata.getProperty(PARTITIONS_KEY));
136        this.blockSize = Integer.parseInt(metadata.getProperty(BLOCK_SIZE_KEY));
137        this.partitions = new ArrayList<>(nbPartitions);
138        initPartitions(false);
139    }
140
141    /**
142     * Create a new Log
143     */
144    protected ChronicleLogAppender(Codec<M> codec, File basePath, int size, ChronicleRetentionDuration retention) {
145        if (size <= 0) {
146            throw new IllegalArgumentException("Number of partitions must be > 0");
147        }
148        if (size > MAX_PARTITIONS) {
149            throw new IllegalArgumentException(
150                    String.format("Cannot create more than: %d partitions for log: %s, requested: %d", MAX_PARTITIONS,
151                            basePath, size));
152        }
153        if (exists(basePath)) {
154            throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + basePath);
155        }
156        if (!basePath.exists() && !basePath.mkdirs()) {
157            throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + basePath);
158        }
159        Objects.requireNonNull(codec);
160        this.nbPartitions = size;
161        this.codec = codec;
162        this.name = basePath.getName();
163        this.basePath = basePath;
164        this.retention = retention;
165        this.partitions = new ArrayList<>(nbPartitions);
166        this.blockSize = CQ_BLOCK_SIZE;
167        if (log.isDebugEnabled()) {
168            log.debug("Creating: " + toString());
169        }
170        initPartitions(true);
171        saveMetadata();
172    }
173
174    protected void initPartitions(boolean create) {
175        for (int i = 0; i < nbPartitions; i++) {
176            Path partitionPath = Paths.get(getBasePath(), String.format("%s%02d", PARTITION_PREFIX, i));
177            if (create) {
178                try {
179                    Files.createDirectories(partitionPath);
180                } catch (IOException e) {
181                    throw new IllegalArgumentException("Cannot create directory: " + partitionPath.toAbsolutePath(), e);
182                }
183            }
184            ChronicleRetentionListener listener = null;
185            SingleChronicleQueueBuilder builder = SingleChronicleQueueBuilder.binary(partitionPath)
186                                                                             .rollCycle(retention.getRollCycle())
187                                                                             .blockSize(blockSize);
188            if (!retention.disable()) {
189                listener = new ChronicleRetentionListener(retention);
190                builder.storeFileListener(listener);
191            }
192            SingleChronicleQueue queue = builder.build();
193            // we don't try to acquire an appender and pretouch because it causes troubles with countExcerpts
194            // the cq4 cycle file will be created on first append
195            partitions.add(queue);
196            if (listener != null) {
197                listener.setQueue(queue);
198            }
199        }
200    }
201
202    protected void saveMetadata() {
203        Path metadata = getMetadataPath();
204        StringBuilder builder = new StringBuilder();
205        builder.append(String.format("# Log created %s%n", Instant.now().toString()));
206        builder.append(String.format("%s=%d%n", PARTITIONS_KEY, nbPartitions));
207        builder.append(String.format("%s=%s%n", RETENTION_KEY, retention));
208        builder.append(String.format("%s=%d%n", BLOCK_SIZE_KEY, blockSize));
209        try {
210            Files.write(metadata, builder.toString().getBytes(), StandardOpenOption.CREATE_NEW);
211        } catch (IOException e) {
212            throw new IllegalArgumentException("Unable to create metadata file: " + metadata, e);
213        }
214        if (log.isDebugEnabled()) {
215            log.debug(String.format("Created Log: %s%n%s", name, builder.toString()), new Throwable("here"));
216        }
217    }
218
219    protected Path getMetadataPath() {
220        return basePath.toPath().resolve(METADATA_FILE);
221    }
222
223    protected static Properties readMetadata(Path file) {
224        Properties props = new Properties();
225        try (InputStream stream = Files.newInputStream(file)) {
226            props.load(stream);
227        } catch (IOException e) {
228            throw new IllegalArgumentException("Cannot open Log metadata file: " + file, e);
229        }
230        return props;
231    }
232
233    protected Properties guessMetadata(ChronicleRetentionDuration retention) {
234        Properties props = new Properties();
235        props.setProperty(PARTITIONS_KEY, Integer.toString(discoverPartitions(basePath.toPath())));
236        props.setProperty(RETENTION_KEY, retention.getRetention());
237        props.setProperty(BLOCK_SIZE_KEY, Integer.toString(CQ_BLOCK_SIZE));
238        return props;
239    }
240
241    protected static boolean exists(File basePath) {
242        // noinspection ConstantConditions
243        return basePath.isDirectory() && basePath.list().length > 0;
244    }
245
246    /**
247     * Create a new log
248     */
249    public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File basePath, int size,
250            ChronicleRetentionDuration retention) {
251        return new ChronicleLogAppender<>(codec, basePath, size, retention);
252    }
253
254    /**
255     * Create a new log.
256     */
257    public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File basePath, int size) {
258        return new ChronicleLogAppender<>(codec, basePath, size, ChronicleRetentionDuration.NONE);
259    }
260
261    /**
262     * Open an existing log.
263     */
264    public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File basePath) {
265        return new ChronicleLogAppender<>(codec, basePath, ChronicleRetentionDuration.NONE);
266    }
267
268    /**
269     * Open an existing log.
270     */
271    public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File basePath,
272            ChronicleRetentionDuration retention) {
273        return new ChronicleLogAppender<>(codec, basePath, retention);
274    }
275
276    public String getBasePath() {
277        return basePath.getPath();
278    }
279
280    @Override
281    public String name() {
282        return name;
283    }
284
285    @Override
286    public int size() {
287        return nbPartitions;
288    }
289
290    @Override
291    public LogOffset append(int partition, M message) {
292        ExcerptAppender appender = partitions.get(partition).acquireAppender();
293        if (NO_CODEC.equals(codec)) {
294            // default format for backward compatibility
295            appender.writeDocument(w -> w.write(MSG_KEY).object(message));
296        } else {
297            appender.writeDocument(w -> w.write().bytes(codec.encode(message)));
298        }
299        long offset = appender.lastIndexAppended();
300        LogOffset ret = new LogOffsetImpl(name, partition, offset);
301        if (log.isDebugEnabled()) {
302            log.debug(String.format("append to %s, value: %s", ret, message));
303        }
304        return ret;
305    }
306
307    public LogTailer<M> createTailer(LogPartition partition, String group, Codec<M> codec) {
308        return addTailer(new ChronicleLogTailer<>(codec, basePath.toString(),
309                partitions.get(partition.partition()).createTailer(), partition, group, retention));
310    }
311
312    public long endOffset(int partition) {
313        return partitions.get(partition).createTailer().toEnd().index();
314    }
315
316    public long firstOffset(int partition) {
317        long ret = partitions.get(partition).firstIndex();
318        if (ret == Long.MAX_VALUE) {
319            return 0;
320        }
321        return ret;
322    }
323
324    public long countMessages(int partition, long lowerOffset, long upperOffset) {
325        long ret;
326        SingleChronicleQueue queue = (SingleChronicleQueue) partitions.get(partition);
327        try {
328            ret = queue.countExcerpts(lowerOffset, upperOffset);
329        } catch (IllegalStateException e) {
330            if (log.isDebugEnabled()) {
331                log.debug("Missing low cycle file: " + lowerOffset + " for queue: " + queue + " " + e.getMessage());
332            }
333            return 0;
334        }
335        return ret;
336    }
337
338    protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer) {
339        tailers.add(tailer);
340        return tailer;
341    }
342
343    @Override
344    public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException {
345        boolean ret;
346        long offsetPosition = offset.offset();
347        int partition = offset.partition().partition();
348        try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(basePath.toString(), partition,
349                group, ChronicleRetentionDuration.disableOf(retention))) {
350            ret = isProcessed(offsetTracker, offsetPosition);
351            if (ret) {
352                return true;
353            }
354            final long timeoutMs = timeout.toMillis();
355            final long deadline = System.currentTimeMillis() + timeoutMs;
356            final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
357            while (!ret && System.currentTimeMillis() < deadline) {
358                Thread.sleep(delay);
359                ret = isProcessed(offsetTracker, offsetPosition);
360            }
361        }
362        return ret;
363    }
364
365    @Override
366    public boolean closed() {
367        return closed;
368    }
369
370    @Override
371    public Codec<M> getCodec() {
372        return codec;
373    }
374
375    protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset) {
376        long last = tracker.readLastCommittedOffset();
377        return last > 0 && last >= offset;
378    }
379
380    @Override
381    public void close() {
382        log.debug("Closing: " + toString());
383        tailers.stream().filter(Objects::nonNull).forEach(ChronicleLogTailer::close);
384        tailers.clear();
385        partitions.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close);
386        partitions.clear();
387        closed = true;
388    }
389
390    public static int partitions(Path basePath) {
391        Path metadataPath = basePath.resolve(METADATA_FILE);
392        if (metadataPath.toFile().exists()) {
393            return Integer.parseInt(readMetadata(metadataPath).getProperty(PARTITIONS_KEY));
394        }
395        // backward compatibility before metadata file
396        return discoverPartitions(basePath);
397    }
398
399    public static int discoverPartitions(Path basePath) {
400        try (Stream<Path> paths = Files.list(basePath)) {
401            int ret = (int) paths.filter(ChronicleLogAppender::isPartitionDirectory).count();
402            if (ret == 0) {
403                throw new IOException("No chronicles queues file found");
404            }
405            return ret;
406        } catch (IOException e) {
407            throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e);
408        }
409    }
410
411    protected static boolean isPartitionDirectory(Path path) {
412        return path.toFile().isDirectory() && path.getFileName().toString().startsWith(PARTITION_PREFIX);
413    }
414
415    @Override
416    public String toString() {
417        return "ChronicleLogAppender{" + "nbPartitions=" + nbPartitions + ", basePath=" + basePath + ", name='" + name
418                + '\'' + ", retention=" + retention + ", closed=" + closed + ", codec=" + codec + '}';
419    }
420
421    public ChronicleRetentionDuration getRetention() {
422        return retention;
423    }
424}