001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.io.Externalizable;
024import java.io.File;
025import java.io.IOException;
026import java.nio.file.Files;
027import java.nio.file.Path;
028import java.time.Duration;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.Objects;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.stream.Stream;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.lib.stream.codec.Codec;
038import org.nuxeo.lib.stream.log.LogOffset;
039import org.nuxeo.lib.stream.log.LogPartition;
040import org.nuxeo.lib.stream.log.LogTailer;
041import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
042import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
043
044import net.openhft.chronicle.queue.ChronicleQueue;
045import net.openhft.chronicle.queue.ExcerptAppender;
046import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
047import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
048
049/**
050 * Chronicle Queue implementation of LogAppender.
051 *
052 * @since 9.3
053 */
054public class ChronicleLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
055    private static final Log log = LogFactory.getLog(ChronicleLogAppender.class);
056
057    protected static final String PARTITION_PREFIX = "P-";
058
059    protected static final int POLL_INTERVAL_MS = 100;
060
061    protected static final int MAX_PARTITIONS = 100;
062
063    public static final String MSG_KEY = "msg";
064
065    protected final List<ChronicleQueue> partitions;
066
067    protected final int nbPartitions;
068
069    protected final File basePath;
070
071    protected final String name;
072
073    // keep track of created tailers to make sure they are closed before the log
074    protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
075
076    protected final ChronicleRetentionDuration retention;
077
078    protected final Codec<M> codec;
079
080    protected volatile boolean closed;
081
082    protected ChronicleLogAppender(Codec<M> codec, File basePath, int size, ChronicleRetentionDuration retention) {
083        if (size == 0) {
084            // open
085            if (!exists(basePath)) {
086                throw new IllegalArgumentException("Cannot open Chronicle Queues, invalid path: " + basePath);
087            }
088            this.nbPartitions = partitions(basePath);
089        } else {
090            // create
091            if (size > MAX_PARTITIONS) {
092                throw new IllegalArgumentException(
093                        String.format("Cannot create more than: %d partitions for log: %s, requested: %d",
094                                MAX_PARTITIONS, basePath, size));
095            }
096            if (exists(basePath)) {
097                throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + basePath);
098            }
099            if (!basePath.exists() && !basePath.mkdirs()) {
100                throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + basePath);
101            }
102            this.nbPartitions = size;
103        }
104        Objects.requireNonNull(codec);
105        this.codec = codec;
106        this.name = basePath.getName();
107        this.basePath = basePath;
108        this.retention = retention;
109        partitions = new ArrayList<>(this.nbPartitions);
110        if (log.isDebugEnabled()) {
111            log.debug(((size == 0) ? "Opening: " : "Creating: ") + toString());
112        }
113        initPartitions();
114    }
115
116    protected void initPartitions() {
117        for (int i = 0; i < nbPartitions; i++) {
118            File path = new File(basePath, String.format("%s%02d", PARTITION_PREFIX, i));
119            if (retention.disable()) {
120                partitions.add(SingleChronicleQueueBuilder.binary(path).build());
121            } else {
122                ChronicleRetentionListener listener = new ChronicleRetentionListener(retention);
123                SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(path)
124                                                                        .rollCycle(retention.getRollCycle())
125                                                                        .storeFileListener(listener)
126                                                                        .build();
127                listener.setQueue(queue);
128                partitions.add(queue);
129            }
130            try {
131                // make sure the directory is created so we can count the partitions
132                Files.createDirectories(path.toPath());
133            } catch (IOException e) {
134                throw new IllegalArgumentException("Cannot create directory: " + path.getAbsolutePath(), e);
135            }
136        }
137    }
138
139    protected static boolean exists(File basePath) {
140        // noinspection ConstantConditions
141        return basePath.isDirectory() && basePath.list().length > 0;
142    }
143
144    /**
145     * Create a new log
146     */
147    public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File basePath, int size,
148            ChronicleRetentionDuration retention) {
149        return new ChronicleLogAppender<>(codec, basePath, size, retention);
150    }
151
152    /**
153     * Create a new log.
154     */
155    public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File basePath, int size) {
156        return new ChronicleLogAppender<>(codec, basePath, size, ChronicleRetentionDuration.DISABLE);
157    }
158
159    /**
160     * Open an existing log.
161     */
162    public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File basePath) {
163        return new ChronicleLogAppender<>(codec, basePath, 0, ChronicleRetentionDuration.DISABLE);
164    }
165
166    /**
167     * Open an existing log.
168     */
169    public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File basePath,
170            ChronicleRetentionDuration retention) {
171        return new ChronicleLogAppender<>(codec, basePath, 0, retention);
172    }
173
174    public String getBasePath() {
175        return basePath.getPath();
176    }
177
178    @Override
179    public String name() {
180        return name;
181    }
182
183    @Override
184    public int size() {
185        return nbPartitions;
186    }
187
188    @Override
189    public LogOffset append(int partition, M message) {
190        ExcerptAppender appender = partitions.get(partition).acquireAppender();
191        if (NO_CODEC.equals(codec)) {
192            // default format for backward compatibility
193            appender.writeDocument(w -> w.write(MSG_KEY).object(message));
194        } else {
195            appender.writeDocument(w -> w.write().bytes(codec.encode(message)));
196        }
197        long offset = appender.lastIndexAppended();
198        LogOffset ret = new LogOffsetImpl(name, partition, offset);
199        if (log.isDebugEnabled()) {
200            log.debug(String.format("append to %s, value: %s", ret, message));
201        }
202        return ret;
203    }
204
205    public LogTailer<M> createTailer(LogPartition partition, String group, Codec<M> codec) {
206        return addTailer(new ChronicleLogTailer<>(codec, basePath.toString(),
207                partitions.get(partition.partition()).createTailer(), partition, group, retention));
208    }
209
210    public long endOffset(int partition) {
211        return partitions.get(partition).createTailer().toEnd().index();
212    }
213
214    public long firstOffset(int partition) {
215        long ret = partitions.get(partition).firstIndex();
216        if (ret == Long.MAX_VALUE) {
217            return 0;
218        }
219        return ret;
220    }
221
222    public long countMessages(int partition, long lowerOffset, long upperOffset) {
223        long ret;
224        SingleChronicleQueue queue = (SingleChronicleQueue) partitions.get(partition);
225        try {
226            ret = queue.countExcerpts(lowerOffset, upperOffset);
227        } catch (IllegalStateException e) {
228            if (log.isDebugEnabled()) {
229                log.debug("Missing low cycle file: " + lowerOffset + " for queue: " + queue + " " + e.getMessage());
230            }
231            return 0;
232        }
233        return ret;
234    }
235
236    protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer) {
237        tailers.add(tailer);
238        return tailer;
239    }
240
241    @Override
242    public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException {
243        boolean ret;
244        long offsetPosition = offset.offset();
245        int partition = offset.partition().partition();
246        try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(basePath.toString(), partition,
247                group)) {
248            ret = isProcessed(offsetTracker, offsetPosition);
249            if (ret) {
250                return true;
251            }
252            final long timeoutMs = timeout.toMillis();
253            final long deadline = System.currentTimeMillis() + timeoutMs;
254            final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
255            while (!ret && System.currentTimeMillis() < deadline) {
256                Thread.sleep(delay);
257                ret = isProcessed(offsetTracker, offsetPosition);
258            }
259        }
260        return ret;
261    }
262
263    @Override
264    public boolean closed() {
265        return closed;
266    }
267
268    @Override
269    public Codec<M> getCodec() {
270        return codec;
271    }
272
273    protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset) {
274        long last = tracker.readLastCommittedOffset();
275        return last > 0 && last >= offset;
276    }
277
278    @Override
279    public void close() {
280        log.debug("Closing: " + toString());
281        tailers.stream().filter(Objects::nonNull).forEach(ChronicleLogTailer::close);
282        tailers.clear();
283        partitions.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close);
284        partitions.clear();
285        closed = true;
286    }
287
288    public static int partitions(File basePath) {
289        int ret;
290        try (Stream<Path> paths = Files.list(basePath.toPath())) {
291            ret = (int) paths.filter(
292                    path -> (path.toFile().isDirectory() && path.getFileName().toString().startsWith(PARTITION_PREFIX)))
293                             .count();
294            if (ret == 0) {
295                throw new IOException("No chronicles queues file found");
296            }
297        } catch (IOException e) {
298            throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e);
299        }
300        return ret;
301    }
302
303    @Override
304    public String toString() {
305        return "ChronicleLogAppender{" + "nbPartitions=" + nbPartitions + ", basePath=" + basePath + ", name='" + name
306                + '\'' + ", retention=" + retention + ", closed=" + closed + ", codec=" + codec + '}';
307    }
308
309    public ChronicleRetentionDuration getRetention() {
310        return retention;
311    }
312}