001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import java.io.File;
022import java.io.IOException;
023import java.nio.file.Files;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.NavigableSet;
028
029import org.apache.logging.log4j.LogManager;
030import org.apache.logging.log4j.Logger;
031import org.jetbrains.annotations.Nullable;
032import org.nuxeo.lib.stream.StreamRuntimeException;
033
034import net.openhft.chronicle.queue.impl.StoreFileListener;
035import net.openhft.chronicle.queue.impl.WireStore;
036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
037import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore;
038
039/**
040 * @since 9.3
041 */
042public class ChronicleRetentionListener implements StoreFileListener {
043
044    private static final Logger log = LogManager.getLogger(ChronicleRetentionListener.class);
045
046    protected final ChronicleRetentionDuration retention;
047
048    protected SingleChronicleQueue queue;
049
050    protected long purgedStamp;
051
052    public ChronicleRetentionListener(ChronicleRetentionDuration retention) {
053        this.retention = retention;
054    }
055
056    public void setQueue(SingleChronicleQueue queue) {
057        this.queue = queue;
058    }
059
060    @Override
061    public void onAcquired(int cycle, File file) {
062        if (queue == null || retention.disable()) {
063            return;
064        }
065        log.debug("Acquire Chronicle file: {}, cycle: {}", file, cycle);
066
067    }
068
069    public synchronized void purge() {
070        if (queue == null || queue.isClosed() || retention.disable() || !queue.file().exists()) {
071            return;
072        }
073        List<Integer> cycles = getAllCycles();
074        int cyclesToRemove = cycles.size() - retention.getRetentionCycles();
075        if (cyclesToRemove <= 0) {
076            return;
077        }
078        purgedStamp = System.currentTimeMillis();
079        cycles.subList(0, cyclesToRemove).forEach(this::dropCycle);
080        // this is needed to update first cycle, it calls directoryListing.refresh()
081        queue.createTailer().close();
082    }
083
084    protected void dropCycle(Integer cycle) {
085        SingleChronicleQueueStore store = queue.storeForCycle(cycle, queue.epoch(), false, null);
086        if (store == null) {
087            return;
088        }
089        File file = store.file();
090        if (file == null || !file.exists()) {
091            return;
092        }
093        log.info("Deleting Chronicle file: {} according to retention: {}", file::getAbsolutePath, () -> retention);
094        try {
095            queue.closeStore(store);
096            Files.delete(file.toPath());
097            queue.refreshDirectoryListing();
098            log.debug(file + " deleted");
099        } catch (IOException | SecurityException e) {
100            log.warn("Unable to delete Chronicle file: {}, {}", file::getAbsolutePath, e::getMessage);
101        }
102    }
103
104    protected List<Integer> getAllCycles() {
105        List<Integer> ret = new ArrayList<>();
106        try {
107            NavigableSet<Long> allCycles = queue.listCyclesBetween(queue.firstCycle(), queue.lastCycle());
108            allCycles.iterator().forEachRemaining(cycle -> ret.add(cycle.intValue()));
109            return ret;
110        } catch (ParseException e) {
111            throw new StreamRuntimeException("Fail to list cycles for queue: " + queue, e);
112        }
113    }
114
115    @Override
116    public void onReleased(int cycle, File file) {
117        if (queue == null || queue.isClosed() || retention.disable()) {
118            return;
119        }
120        log.debug("Release Chronicle file: {}, cycle: {}", file, cycle);
121        if (checkPurge()) {
122            purge();
123        }
124    }
125
126    protected boolean checkPurge() {
127        // there is no need to purge more than the cycle length (which is duration in ms)
128        if (System.currentTimeMillis() - purgedStamp >= retention.getRollCycle().length()) {
129            return true;
130        }
131        log.debug("Skipping purge already done in within cycle duration: {}", purgedStamp);
132        return false;
133    }
134
135}