001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import java.io.File;
022import java.io.IOException;
023import java.nio.file.Files;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.NavigableSet;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.lib.stream.StreamRuntimeException;
032
033import net.openhft.chronicle.queue.impl.StoreFileListener;
034import net.openhft.chronicle.queue.impl.WireStore;
035import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
036
037/**
038 * @since 9.3
039 */
040public class ChronicleRetentionListener implements StoreFileListener {
041    private static final Log log = LogFactory.getLog(ChronicleRetentionListener.class);
042
043    protected final ChronicleRetentionDuration retention;
044
045    protected SingleChronicleQueue queue;
046
047    protected long purgedStamp;
048
049    public ChronicleRetentionListener(ChronicleRetentionDuration retention) {
050        this.retention = retention;
051    }
052
053    public void setQueue(SingleChronicleQueue queue) {
054        this.queue = queue;
055    }
056
057    @Override
058    public void onAcquired(int cycle, File file) {
059        if (queue == null || retention.disable()) {
060            return;
061        }
062        if (log.isDebugEnabled()) {
063            log.debug(String.format("Acquire Chronicle file: %s, cycle: %s", file, cycle));
064        }
065    }
066
067    public synchronized void purge() {
068        if (queue == null || queue.isClosed() || retention.disable() || !queue.file().exists()) {
069            return;
070        }
071        List<Integer> cycles = getAllCycles();
072        int cyclesToRemove = cycles.size() - retention.getRetentionCycles();
073        if (cyclesToRemove <= 0) {
074            return;
075        }
076        purgedStamp = System.currentTimeMillis();
077        cycles.subList(0, cyclesToRemove).forEach(this::dropCycle);
078        // this is needed to update first cycle, it calls directoryListing.refresh()
079        queue.createTailer();
080    }
081
082    protected void dropCycle(Integer cycle) {
083        File file = getFileForCycle(cycle);
084        if (!file.exists()) {
085            return;
086        }
087        log.info(String.format("Deleting Chronicle file: %s according to retention: %s", file.getAbsolutePath(),
088                retention));
089        try {
090            Files.delete(file.toPath());
091            queue.refreshDirectlyListing();
092            log.debug(file + " deleted");
093        } catch (IOException | SecurityException e) {
094            log.warn(String.format("Unable to delete Chronicle file: %s, %s", file.getAbsolutePath(), e.getMessage()));
095        }
096    }
097
098    protected List<Integer> getAllCycles() {
099        List<Integer> ret = new ArrayList<>();
100        try {
101            NavigableSet<Long> allCycles = queue.listCyclesBetween(queue.firstCycle(), queue.lastCycle());
102            allCycles.iterator().forEachRemaining(cycle -> ret.add(cycle.intValue()));
103            return ret;
104        } catch (ParseException e) {
105            throw new StreamRuntimeException("Fail to list cycles for queue: " + queue, e);
106        }
107    }
108
109    protected File getFileForCycle(int cycle) {
110        WireStore store = queue.storeForCycle(cycle, queue.epoch(), false);
111        return (store != null) ? store.file() : null;
112    }
113
114    @Override
115    public void onReleased(int cycle, File file) {
116        if (queue == null || queue.isClosed() || retention.disable()) {
117            return;
118        }
119        if (log.isDebugEnabled()) {
120            log.debug(String.format("Release Chronicle file: %s, cycle: %d", file, cycle));
121        }
122        if (checkPurge()) {
123            purge();
124        }
125    }
126
127    protected boolean checkPurge() {
128        // there is no need to purge more than the cycle length (which is duration in ms)
129        if (System.currentTimeMillis() - purgedStamp >= retention.getRollCycle().length()) {
130            return true;
131        }
132        if (log.isDebugEnabled()) {
133            log.debug("Skipping purge already done in within cycle duration: " + purgedStamp);
134        }
135        return false;
136    }
137
138}