001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import java.io.Externalizable;
022import java.io.File;
023import java.io.IOException;
024import java.nio.file.Files;
025import java.nio.file.Path;
026import java.time.Duration;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Objects;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.stream.Stream;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.lib.stream.log.LogOffset;
036import org.nuxeo.lib.stream.log.LogPartition;
037import org.nuxeo.lib.stream.log.LogTailer;
038import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
039import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
040
041import net.openhft.chronicle.queue.ChronicleQueue;
042import net.openhft.chronicle.queue.ExcerptAppender;
043import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
044import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
045
046/**
047 * Chronicle Queue implementation of LogAppender.
048 *
049 * @since 9.3
050 */
051public class ChronicleLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
052    private static final Log log = LogFactory.getLog(ChronicleLogAppender.class);
053
054    protected static final String PARTITION_PREFIX = "P-";
055
056    protected static final int POLL_INTERVAL_MS = 100;
057
058    protected static final int MAX_PARTITIONS = 100;
059
060    protected final List<ChronicleQueue> partitions;
061
062    protected final int nbPartitions;
063
064    protected final File basePath;
065
066    protected final String name;
067
068    // keep track of created tailers to make sure they are closed before the log
069    protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
070
071    protected final ChronicleRetentionDuration retention;
072
073    protected volatile boolean closed;
074
075    protected ChronicleLogAppender(File basePath, int size, ChronicleRetentionDuration retention) {
076        if (size == 0) {
077            // open
078            if (!exists(basePath)) {
079                throw new IllegalArgumentException("Cannot open Chronicle Queues, invalid path: " + basePath);
080            }
081            this.nbPartitions = findNbQueues(basePath);
082        } else {
083            // create
084            if (size > MAX_PARTITIONS) {
085                throw new IllegalArgumentException(
086                        String.format("Cannot create more than: %d partitions for log: %s, requested: %d",
087                                MAX_PARTITIONS, basePath, size));
088            }
089            if (exists(basePath)) {
090                throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + basePath);
091            }
092            if (!basePath.exists() && !basePath.mkdirs()) {
093                throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + basePath);
094            }
095            this.nbPartitions = size;
096        }
097        this.name = basePath.getName();
098        this.basePath = basePath;
099        this.retention = retention;
100        partitions = new ArrayList<>(this.nbPartitions);
101        if (log.isDebugEnabled()) {
102            log.debug(((size == 0) ? "Opening: " : "Creating: ") + toString());
103        }
104        initPartitions();
105    }
106
107    protected void initPartitions() {
108        for (int i = 0; i < nbPartitions; i++) {
109            File path = new File(basePath, String.format("%s%02d", PARTITION_PREFIX, i));
110            if (retention.disable()) {
111                partitions.add(SingleChronicleQueueBuilder.binary(path).build());
112            } else {
113                ChronicleRetentionListener listener = new ChronicleRetentionListener(retention);
114                SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(path)
115                                                                        .rollCycle(retention.getRollCycle())
116                                                                        .storeFileListener(listener)
117                                                                        .build();
118                listener.setQueue(queue);
119                partitions.add(queue);
120            }
121            try {
122                // make sure the directory is created so we can count the partitions
123                Files.createDirectories(path.toPath());
124            } catch (IOException e) {
125                throw new IllegalArgumentException("Cannot create directory: " + path.getAbsolutePath(), e);
126            }
127        }
128    }
129
130    protected static boolean exists(File basePath) {
131        // noinspection ConstantConditions
132        return basePath.isDirectory() && basePath.list().length > 0;
133    }
134
135    /**
136     * Create a new log
137     */
138    public static <M extends Externalizable> ChronicleLogAppender<M> create(File basePath, int size,
139            ChronicleRetentionDuration retention) {
140        return new ChronicleLogAppender<>(basePath, size, retention);
141    }
142
143    /**
144     * Create a new log.
145     */
146    public static <M extends Externalizable> ChronicleLogAppender<M> create(File basePath, int size) {
147        return new ChronicleLogAppender<>(basePath, size, ChronicleRetentionDuration.DISABLE);
148    }
149
150    /**
151     * Open an existing log.
152     */
153    public static <M extends Externalizable> ChronicleLogAppender<M> open(File basePath) {
154        return new ChronicleLogAppender<>(basePath, 0, ChronicleRetentionDuration.DISABLE);
155    }
156
157    /**
158     * Open an existing log.
159     */
160    public static <M extends Externalizable> ChronicleLogAppender<M> open(File basePath,
161            ChronicleRetentionDuration retention) {
162        return new ChronicleLogAppender<>(basePath, 0, retention);
163    }
164
165    public String getBasePath() {
166        return basePath.getPath();
167    }
168
169    @Override
170    public String name() {
171        return name;
172    }
173
174    @Override
175    public int size() {
176        return nbPartitions;
177    }
178
179    @Override
180    public LogOffset append(int partition, M message) {
181        ExcerptAppender appender = partitions.get(partition).acquireAppender();
182        appender.writeDocument(w -> w.write("msg").object(message));
183        long offset = appender.lastIndexAppended();
184        LogOffset ret = new LogOffsetImpl(name, partition, offset);
185        if (log.isDebugEnabled()) {
186            log.debug(String.format("append to %s, value: %s", ret, message));
187        }
188        return ret;
189    }
190
191    public LogTailer<M> createTailer(LogPartition partition, String group) {
192        return addTailer(new ChronicleLogTailer<>(basePath.toString(),
193                partitions.get(partition.partition()).createTailer(), partition, group, retention));
194    }
195
196    public long endOffset(int partition) {
197        return partitions.get(partition).createTailer().toEnd().index();
198    }
199
200    public long firstOffset(int partition) {
201        long ret = partitions.get(partition).firstIndex();
202        if (ret == Long.MAX_VALUE) {
203            return 0;
204        }
205        return ret;
206    }
207
208    public long countMessages(int partition, long lowerOffset, long upperOffset) {
209        long ret;
210        SingleChronicleQueue queue = (SingleChronicleQueue) partitions.get(partition);
211        try {
212            ret = queue.countExcerpts(lowerOffset, upperOffset);
213        } catch (IllegalStateException e) {
214            if (log.isDebugEnabled()) {
215                log.debug("Missing low cycle file: " + lowerOffset + " for queue: " + queue + " " + e.getMessage());
216            }
217            return 0;
218        }
219        // System.out.println("partition: " + partition + ", count from " + lowerOffset + " to " + upperOffset + " = " +
220        // ret);
221        return ret;
222    }
223
224    protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer) {
225        tailers.add(tailer);
226        return tailer;
227    }
228
229    @Override
230    public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException {
231        boolean ret;
232        long offsetPosition = offset.offset();
233        int partition = offset.partition().partition();
234        try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(basePath.toString(), partition,
235                group)) {
236            ret = isProcessed(offsetTracker, offsetPosition);
237            if (ret) {
238                return true;
239            }
240            final long timeoutMs = timeout.toMillis();
241            final long deadline = System.currentTimeMillis() + timeoutMs;
242            final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
243            while (!ret && System.currentTimeMillis() < deadline) {
244                Thread.sleep(delay);
245                ret = isProcessed(offsetTracker, offsetPosition);
246            }
247        }
248        return ret;
249    }
250
251    @Override
252    public boolean closed() {
253        return closed;
254    }
255
256    protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset) {
257        long last = tracker.readLastCommittedOffset();
258        return last > 0 && last >= offset;
259    }
260
261    @Override
262    public void close() {
263        log.debug("Closing: " + toString());
264        tailers.stream().filter(Objects::nonNull).forEach(ChronicleLogTailer::close);
265        tailers.clear();
266        partitions.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close);
267        partitions.clear();
268        closed = true;
269    }
270
271    protected int findNbQueues(File basePath) {
272        int ret;
273        try (Stream<Path> paths = Files.list(basePath.toPath())) {
274            ret = (int) paths.filter(
275                    path -> (Files.isDirectory(path) && path.getFileName().toString().startsWith(PARTITION_PREFIX)))
276                             .count();
277            if (ret == 0) {
278                throw new IOException("No chronicles queues file found");
279            }
280        } catch (IOException e) {
281            throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e);
282        }
283        return ret;
284    }
285
286    @Override
287    public String toString() {
288        return "ChronicleLogAppender{" + "nbPartitions=" + nbPartitions + ", basePath=" + basePath + ", name='" + name
289                + '\'' + ", retention=" + retention + ", closed=" + closed + '}';
290    }
291
292    public ChronicleRetentionDuration getRetention() {
293        return retention;
294    }
295}