001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import java.io.File;
022import java.io.IOException;
023import java.nio.file.Files;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.NavigableSet;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import net.openhft.chronicle.queue.impl.StoreFileListener;
033import net.openhft.chronicle.queue.impl.WireStore;
034import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
035
036/**
037 * @since 9.3
038 */
039public class ChronicleRetentionListener implements StoreFileListener {
040    private static final Log log = LogFactory.getLog(ChronicleRetentionListener.class);
041
042    protected final ChronicleRetentionDuration retention;
043
044    protected SingleChronicleQueue queue;
045
046    protected long purgedStamp;
047
048    public ChronicleRetentionListener(ChronicleRetentionDuration retention) {
049        this.retention = retention;
050    }
051
052    public void setQueue(SingleChronicleQueue queue) {
053        this.queue = queue;
054    }
055
056    @Override
057    public void onAcquired(int cycle, File file) {
058        if (queue == null || retention.disable()) {
059            return;
060        }
061        if (log.isDebugEnabled()) {
062            log.debug(String.format("Acquire Chronicle file: %s, cycle: %s", file, cycle));
063        }
064    }
065
066    public synchronized void purge() {
067        if (queue == null || retention.disable()) {
068            return;
069        }
070        List<Integer> cycles = getAllCycles();
071        int cyclesToRemove = cycles.size() - retention.getRetentionCycles();
072        if (cyclesToRemove <= 0) {
073            return;
074        }
075        purgedStamp = System.currentTimeMillis();
076        cycles.subList(0, cyclesToRemove).forEach(this::dropCycle);
077        // this is needed to update first cycle, it calls directoryListing.refresh()
078        queue.createTailer();
079    }
080
081    protected void dropCycle(Integer cycle) {
082        File file = getFileForCycle(cycle);
083        if (!file.exists()) {
084            return;
085        }
086        log.info(String.format("Deleting Chronicle file: %s according to retention: %s", file.getAbsolutePath(),
087                retention));
088        try {
089            Files.delete(file.toPath());
090            log.debug(file + " deleted");
091        } catch (IOException | SecurityException e) {
092            log.warn(String.format("Unable to delete Chronicle file: %s, %s", file.getAbsolutePath(), e.getMessage()));
093        }
094    }
095
096    protected List<Integer> getAllCycles() {
097        List<Integer> ret = new ArrayList<>();
098        try {
099            NavigableSet<Long> allCycles = queue.listCyclesBetween(queue.firstCycle(), queue.lastCycle());
100            allCycles.iterator().forEachRemaining(cycle -> ret.add(cycle.intValue()));
101            return ret;
102        } catch (ParseException e) {
103            throw new RuntimeException("Fail to list cycles for queue: " + queue, e);
104        }
105    }
106
107    protected File getFileForCycle(int cycle) {
108        WireStore store = queue.storeForCycle(cycle, queue.epoch(), false);
109        return (store != null) ? store.file() : null;
110    }
111
112    @Override
113    public void onReleased(int cycle, File file) {
114        if (queue == null || retention.disable()) {
115            return;
116        }
117        if (log.isDebugEnabled()) {
118            log.debug(String.format("Release Chronicle file: %s, cycle: %d", file, cycle));
119        }
120        if (checkPurge()) {
121            purge();
122        }
123    }
124
125    protected boolean checkPurge() {
126        // there is no need to purge more than the cycle length (which is duration in ms)
127        if (System.currentTimeMillis() - purgedStamp >= retention.getRollCycle().length()) {
128            return true;
129        }
130        if (log.isDebugEnabled()) {
131            log.debug("Skipping purge already done in within cycle duration: " + purgedStamp);
132        }
133        return false;
134    }
135
136}