001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.io.File;
025import java.io.IOException;
026import java.io.InputStream;
027import java.nio.file.Files;
028import java.nio.file.Path;
029import java.nio.file.StandardOpenOption;
030import java.time.Duration;
031import java.time.Instant;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Objects;
035import java.util.Properties;
036import java.util.concurrent.ConcurrentLinkedQueue;
037import java.util.stream.Stream;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.lib.stream.StreamRuntimeException;
042import org.nuxeo.lib.stream.codec.Codec;
043import org.nuxeo.lib.stream.log.LogOffset;
044import org.nuxeo.lib.stream.log.LogPartition;
045import org.nuxeo.lib.stream.log.LogTailer;
046import org.nuxeo.lib.stream.log.Name;
047import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
048import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
049
050import net.openhft.chronicle.bytes.util.DecoratedBufferOverflowException;
051import net.openhft.chronicle.queue.ChronicleQueue;
052import net.openhft.chronicle.queue.ExcerptAppender;
053import net.openhft.chronicle.queue.ExcerptTailer;
054import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
055import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
056
057/**
058 * Chronicle Queue implementation of LogAppender.
059 *
060 * @since 9.3
061 */
062public class ChronicleLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
063    private static final Log log = LogFactory.getLog(ChronicleLogAppender.class);
064
065    protected static final String PARTITION_PREFIX = "P-";
066
067    protected static final String METADATA_FILE = "metadata.properties";
068
069    protected static final int POLL_INTERVAL_MS = 100;
070
071    protected static final int MAX_PARTITIONS = 100;
072
073    public static final String MSG_KEY = "msg";
074
075    // The block size determines the initial cq4 spare file size and the maximum message size.
076    // A 4M block size creates a 5M file and enable a 1MB message
077    public static final int CQ_BLOCK_SIZE = 4_194_304;
078
079    public static final String RETENTION_KEY = "retention";
080
081    public static final String PARTITIONS_KEY = "partitions";
082
083    public static final String BLOCK_SIZE_KEY = "blockSize";
084
085    protected final List<ChronicleQueue> partitions;
086
087    protected final int nbPartitions;
088
089    protected final File basePath;
090
091    protected final int blockSize;
092
093    protected final Name name;
094
095    // keep track of created tailers to make sure they are closed before the log
096    protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
097
098    protected final ChronicleRetentionDuration retention;
099
100    protected final Codec<M> codec;
101
102    protected volatile boolean closed;
103
104    /**
105     * Open an existing Log
106     */
107    protected ChronicleLogAppender(ChronicleLogConfig config, Name name, Codec<M> codec, boolean withRetention) {
108        basePath = config.getBasePath().resolve(name.getId()).toFile();
109        if (!exists(basePath)) {
110            throw new IllegalArgumentException("Cannot open Chronicle Log, invalid path: " + basePath);
111        }
112        if (log.isDebugEnabled()) {
113            log.debug("Opening: " + toString());
114        }
115        Objects.requireNonNull(codec);
116        this.codec = codec;
117
118        this.name = name;
119
120        Path metadataPath = getMetadataPath();
121
122        if (!metadataPath.toFile().exists()) {
123            throw new IllegalArgumentException(
124                    String.format("Cannot open Log %s: no metadata file %s", this.name, metadataPath));
125        }
126        Properties metadata = readMetadata(getMetadataPath());
127        if (!withRetention) {
128            this.retention = ChronicleRetentionDuration.disableOf(config.getRetention());
129        } else {
130            ChronicleRetentionDuration storedRetention = new ChronicleRetentionDuration(
131                    metadata.getProperty(RETENTION_KEY));
132            if (config.getRetention().getRollCycle() != storedRetention.getRollCycle()) {
133                // we can change the number of retention cycles but not the roll cycle
134                throw new IllegalArgumentException(String.format("Cannot open Log %s: expecting retention: %s got: %s",
135                        this.name, storedRetention, config.getRetention()));
136            }
137            this.retention = config.getRetention();
138        }
139        this.nbPartitions = Integer.parseInt(metadata.getProperty(PARTITIONS_KEY));
140        this.blockSize = Integer.parseInt(metadata.getProperty(BLOCK_SIZE_KEY));
141        this.partitions = new ArrayList<>(nbPartitions);
142        initPartitions(false);
143    }
144
145    /**
146     * Create a new Log
147     */
148    protected ChronicleLogAppender(ChronicleLogConfig config, Name name, int size, Codec<M> codec) {
149        if (size <= 0) {
150            throw new IllegalArgumentException("Number of partitions must be > 0");
151        }
152        basePath = config.getBasePath().resolve(name.getId()).toFile();
153        if (size > MAX_PARTITIONS) {
154            throw new IllegalArgumentException(
155                    String.format("Cannot create more than: %d partitions for log: %s, requested: %d", MAX_PARTITIONS,
156                            basePath, size));
157        }
158        if (exists(basePath)) {
159            throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + basePath);
160        }
161        if (!basePath.exists() && !basePath.mkdirs()) {
162            throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + basePath);
163        }
164        Objects.requireNonNull(codec);
165        this.nbPartitions = size;
166        this.codec = codec;
167        this.retention = config.getRetention();
168        this.partitions = new ArrayList<>(nbPartitions);
169        this.name = name;
170        this.blockSize = CQ_BLOCK_SIZE;
171        if (log.isDebugEnabled()) {
172            log.debug("Creating: " + toString());
173        }
174        initPartitions(true);
175        saveMetadata();
176    }
177
178    protected void initPartitions(boolean create) {
179        for (int i = 0; i < nbPartitions; i++) {
180            Path partitionPath = basePath.toPath().resolve(String.format("%s%02d", PARTITION_PREFIX, i));
181            if (create) {
182                try {
183                    Files.createDirectories(partitionPath);
184                } catch (IOException e) {
185                    throw new IllegalArgumentException("Cannot create directory: " + partitionPath.toAbsolutePath(), e);
186                }
187            }
188            ChronicleRetentionListener listener = null;
189            SingleChronicleQueueBuilder builder = SingleChronicleQueueBuilder.binary(partitionPath)
190                                                                             .rollCycle(retention.getRollCycle())
191                                                                             .blockSize(blockSize);
192            if (!retention.disable()) {
193                listener = new ChronicleRetentionListener(retention);
194                builder.storeFileListener(listener);
195            }
196            SingleChronicleQueue queue = builder.build();
197            // we don't try to acquire an appender and pretouch because it causes troubles with countExcerpts
198            // the cq4 cycle file will be created on first append
199            partitions.add(queue);
200            if (listener != null) {
201                listener.setQueue(queue);
202            }
203        }
204    }
205
206    protected void saveMetadata() {
207        Path metadata = getMetadataPath();
208        StringBuilder builder = new StringBuilder();
209        builder.append(String.format("# Log created %s%n", Instant.now().toString()));
210        builder.append(String.format("%s=%d%n", PARTITIONS_KEY, nbPartitions));
211        builder.append(String.format("%s=%s%n", RETENTION_KEY, retention));
212        builder.append(String.format("%s=%d%n", BLOCK_SIZE_KEY, blockSize));
213        try {
214            Files.write(metadata, builder.toString().getBytes(), StandardOpenOption.CREATE_NEW);
215        } catch (IOException e) {
216            throw new IllegalArgumentException("Unable to create metadata file: " + metadata, e);
217        }
218        if (log.isDebugEnabled()) {
219            log.debug(String.format("Created Log: %s%n%s", name, builder.toString()), new Throwable("here"));
220        }
221    }
222
223    protected Path getMetadataPath() {
224        return basePath.toPath().resolve(METADATA_FILE);
225    }
226
227    protected static Properties readMetadata(Path file) {
228        Properties props = new Properties();
229        try (InputStream stream = Files.newInputStream(file)) {
230            props.load(stream);
231        } catch (IOException e) {
232            throw new IllegalArgumentException("Cannot open Log metadata file: " + file, e);
233        }
234        return props;
235    }
236
237    protected static boolean exists(File basePath) {
238        // noinspection ConstantConditions
239        return basePath.isDirectory() && basePath.list().length > 0;
240    }
241
242    /**
243     * Create a new log
244     */
245    public static <M extends Externalizable> ChronicleLogAppender<M> create(ChronicleLogConfig config, Name name,
246            int size, Codec<M> codec) {
247        return new ChronicleLogAppender<>(config, name, size, codec);
248    }
249
250    /**
251     * Open an existing log.
252     */
253    public static <M extends Externalizable> ChronicleLogAppender<M> open(ChronicleLogConfig config, Name name,
254            Codec<M> codec) {
255        return new ChronicleLogAppender<>(config, name, codec, true);
256    }
257
258    public static <M extends Externalizable> ChronicleLogAppender<M> openWithoutRetention(ChronicleLogConfig config,
259            Name name, Codec<M> codec) {
260        return new ChronicleLogAppender<>(config, name, codec, false);
261    }
262
263    public String getBasePath() {
264        return basePath.getPath();
265    }
266
267    @Override
268    public Name name() {
269        return name;
270    }
271
272    @Override
273    public int size() {
274        return nbPartitions;
275    }
276
277    @Override
278    public LogOffset append(int partition, M message) {
279        ExcerptAppender appender = partitions.get(partition).acquireAppender();
280        try {
281            if (NO_CODEC.equals(codec)) {
282                // default format for backward compatibility
283                appender.writeDocument(w -> w.write(MSG_KEY).object(message));
284            } else {
285                appender.writeDocument(w -> w.write().bytes(codec.encode(message)));
286            }
287        } catch (DecoratedBufferOverflowException e) {
288            throw new StreamRuntimeException(e);
289        }
290        long offset = appender.lastIndexAppended();
291        LogOffset ret = new LogOffsetImpl(name, partition, offset);
292        if (log.isDebugEnabled()) {
293            log.debug(String.format("append to %s, value: %s", ret, message));
294        }
295        return ret;
296    }
297
298    public LogTailer<M> createTailer(LogPartition partition, Name group, Codec<M> codec) {
299        return addTailer(new ChronicleLogTailer<>(codec, basePath.toString(),
300                partitions.get(partition.partition()).createTailer(), partition, group, retention));
301    }
302
303    public long endOffset(int partition) {
304        try (ExcerptTailer tailer = partitions.get(partition).createTailer().toEnd()) {
305            return tailer.index();
306        }
307    }
308
309    public long firstOffset(int partition) {
310        long ret = partitions.get(partition).firstIndex();
311        if (ret == Long.MAX_VALUE) {
312            return 0;
313        }
314        return ret;
315    }
316
317    public long countMessages(int partition, long lowerOffset, long upperOffset) {
318        long ret;
319        SingleChronicleQueue queue = (SingleChronicleQueue) partitions.get(partition);
320        try {
321            ret = queue.countExcerpts(lowerOffset, upperOffset);
322        } catch (IllegalStateException e) {
323            if (log.isDebugEnabled()) {
324                log.debug("Missing low cycle file: " + lowerOffset + " for queue: " + queue + " " + e.getMessage());
325            }
326            return 0;
327        }
328        return ret;
329    }
330
331    protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer) {
332        tailers.add(tailer);
333        return tailer;
334    }
335
336    @Override
337    public boolean waitFor(LogOffset offset, Name group, Duration timeout) throws InterruptedException {
338        boolean ret;
339        long offsetPosition = offset.offset();
340        int partition = offset.partition().partition();
341        try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(basePath.toString(), partition,
342                group, ChronicleRetentionDuration.disableOf(retention))) {
343            ret = isProcessed(offsetTracker, offsetPosition);
344            if (ret) {
345                return true;
346            }
347            final long timeoutMs = timeout.toMillis();
348            final long deadline = System.currentTimeMillis() + timeoutMs;
349            final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
350            while (!ret && System.currentTimeMillis() < deadline) {
351                Thread.sleep(delay);
352                ret = isProcessed(offsetTracker, offsetPosition);
353            }
354        }
355        return ret;
356    }
357
358    @Override
359    public boolean closed() {
360        return closed;
361    }
362
363    @Override
364    public Codec<M> getCodec() {
365        return codec;
366    }
367
368    protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset) {
369        long last = tracker.readLastCommittedOffset();
370        return last > 0 && last >= offset;
371    }
372
373    @Override
374    public void close() {
375        log.debug("Closing: " + toString());
376        tailers.stream().filter(Objects::nonNull).forEach(ChronicleLogTailer::close);
377        tailers.clear();
378        partitions.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close);
379        partitions.clear();
380        closed = true;
381    }
382
383    public static int partitions(Path basePath) {
384        Path metadataPath = basePath.resolve(METADATA_FILE);
385        if (!metadataPath.toFile().exists()) {
386            throw new IllegalArgumentException("No CQ file on " + basePath);
387        }
388        return Integer.parseInt(readMetadata(metadataPath).getProperty(PARTITIONS_KEY));
389    }
390
391    public static int discoverPartitions(Path basePath) {
392        try (Stream<Path> paths = Files.list(basePath)) {
393            int ret = (int) paths.filter(ChronicleLogAppender::isPartitionDirectory).count();
394            if (ret == 0) {
395                throw new IOException("No chronicles queues file found");
396            }
397            return ret;
398        } catch (IOException e) {
399            throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e);
400        }
401    }
402
403    protected static boolean isPartitionDirectory(Path path) {
404        return path.toFile().isDirectory() && path.getFileName().toString().startsWith(PARTITION_PREFIX);
405    }
406
407    @Override
408    public String toString() {
409        return "ChronicleLogAppender{" + "nbPartitions=" + nbPartitions + ", basePath=" + basePath + ", name='" + name
410                + '\'' + ", retention=" + retention + ", closed=" + closed + ", codec=" + codec + '}';
411    }
412
413    public ChronicleRetentionDuration getRetention() {
414        return retention;
415    }
416}