001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import java.io.File;
022import java.io.IOException;
023import java.nio.file.Files;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.NavigableSet;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.lib.stream.StreamRuntimeException;
032
033import net.openhft.chronicle.queue.impl.StoreFileListener;
034import net.openhft.chronicle.queue.impl.WireStore;
035import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
036
037/**
038 * @since 9.3
039 */
040public class ChronicleRetentionListener implements StoreFileListener {
041    private static final Log log = LogFactory.getLog(ChronicleRetentionListener.class);
042
043    protected final ChronicleRetentionDuration retention;
044
045    protected SingleChronicleQueue queue;
046
047    protected long purgedStamp;
048
049    public ChronicleRetentionListener(ChronicleRetentionDuration retention) {
050        this.retention = retention;
051    }
052
053    public void setQueue(SingleChronicleQueue queue) {
054        this.queue = queue;
055    }
056
057    @Override
058    public void onAcquired(int cycle, File file) {
059        if (queue == null || retention.disable()) {
060            return;
061        }
062        if (log.isDebugEnabled()) {
063            log.debug(String.format("Acquire Chronicle file: %s, cycle: %s", file, cycle));
064        }
065    }
066
067    public synchronized void purge() {
068        if (queue == null || queue.isClosed() || retention.disable()) {
069            return;
070        }
071        List<Integer> cycles = getAllCycles();
072        int cyclesToRemove = cycles.size() - retention.getRetentionCycles();
073        if (cyclesToRemove <= 0) {
074            return;
075        }
076        purgedStamp = System.currentTimeMillis();
077        cycles.subList(0, cyclesToRemove).forEach(this::dropCycle);
078        // this is needed to update first cycle, it calls directoryListing.refresh()
079        queue.createTailer();
080    }
081
082    protected void dropCycle(Integer cycle) {
083        File file = getFileForCycle(cycle);
084        if (!file.exists()) {
085            return;
086        }
087        log.info(String.format("Deleting Chronicle file: %s according to retention: %s", file.getAbsolutePath(),
088                retention));
089        try {
090            Files.delete(file.toPath());
091            log.debug(file + " deleted");
092        } catch (IOException | SecurityException e) {
093            log.warn(String.format("Unable to delete Chronicle file: %s, %s", file.getAbsolutePath(), e.getMessage()));
094        }
095    }
096
097    protected List<Integer> getAllCycles() {
098        List<Integer> ret = new ArrayList<>();
099        try {
100            NavigableSet<Long> allCycles = queue.listCyclesBetween(queue.firstCycle(), queue.lastCycle());
101            allCycles.iterator().forEachRemaining(cycle -> ret.add(cycle.intValue()));
102            return ret;
103        } catch (ParseException e) {
104            throw new StreamRuntimeException("Fail to list cycles for queue: " + queue, e);
105        }
106    }
107
108    protected File getFileForCycle(int cycle) {
109        WireStore store = queue.storeForCycle(cycle, queue.epoch(), false);
110        return (store != null) ? store.file() : null;
111    }
112
113    @Override
114    public void onReleased(int cycle, File file) {
115        if (queue == null || queue.isClosed() || retention.disable()) {
116            return;
117        }
118        if (log.isDebugEnabled()) {
119            log.debug(String.format("Release Chronicle file: %s, cycle: %d", file, cycle));
120        }
121        if (checkPurge()) {
122            purge();
123        }
124    }
125
126    protected boolean checkPurge() {
127        // there is no need to purge more than the cycle length (which is duration in ms)
128        if (System.currentTimeMillis() - purgedStamp >= retention.getRollCycle().length()) {
129            return true;
130        }
131        if (log.isDebugEnabled()) {
132            log.debug("Skipping purge already done in within cycle duration: " + purgedStamp);
133        }
134        return false;
135    }
136
137}