001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import java.io.File; 022import java.io.IOException; 023import java.nio.file.Files; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.NavigableSet; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import net.openhft.chronicle.queue.impl.StoreFileListener; 033import net.openhft.chronicle.queue.impl.WireStore; 034import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 035 036/** 037 * @since 9.3 038 */ 039public class ChronicleRetentionListener implements StoreFileListener { 040 private static final Log log = LogFactory.getLog(ChronicleRetentionListener.class); 041 042 protected final ChronicleRetentionDuration retention; 043 044 protected SingleChronicleQueue queue; 045 046 protected long purgedStamp; 047 048 public ChronicleRetentionListener(ChronicleRetentionDuration retention) { 049 this.retention = retention; 050 } 051 052 public void setQueue(SingleChronicleQueue queue) { 053 this.queue = queue; 054 } 055 056 @Override 057 public void onAcquired(int cycle, File file) { 058 if (queue == null || retention.disable()) { 059 return; 060 } 061 if (log.isDebugEnabled()) { 062 log.debug(String.format("Acquire Chronicle file: %s, cycle: %s", file, cycle)); 063 } 064 } 065 066 public synchronized void purge() { 067 if (queue == null || retention.disable()) { 068 return; 069 } 070 List<Integer> cycles = getAllCycles(); 071 int cyclesToRemove = cycles.size() - retention.getRetentionCycles(); 072 if (cyclesToRemove <= 0) { 073 return; 074 } 075 purgedStamp = System.currentTimeMillis(); 076 cycles.subList(0, cyclesToRemove).forEach(this::dropCycle); 077 // this is needed to update first cycle, it calls directoryListing.refresh() 078 queue.createTailer(); 079 } 080 081 protected void dropCycle(Integer cycle) { 082 File file = getFileForCycle(cycle); 083 if (!file.exists()) { 084 return; 085 } 086 log.info(String.format("Deleting Chronicle file: %s according to retention: %s", file.getAbsolutePath(), 087 retention)); 088 try { 089 Files.delete(file.toPath()); 090 log.debug(file + " deleted"); 091 } catch (IOException | SecurityException e) { 092 log.warn(String.format("Unable to delete Chronicle file: %s, %s", file.getAbsolutePath(), e.getMessage())); 093 } 094 } 095 096 protected List<Integer> getAllCycles() { 097 List<Integer> ret = new ArrayList<>(); 098 try { 099 NavigableSet<Long> allCycles = queue.listCyclesBetween(queue.firstCycle(), queue.lastCycle()); 100 allCycles.iterator().forEachRemaining(cycle -> ret.add(cycle.intValue())); 101 return ret; 102 } catch (ParseException e) { 103 throw new RuntimeException("Fail to list cycles for queue: " + queue, e); 104 } 105 } 106 107 protected File getFileForCycle(int cycle) { 108 WireStore store = queue.storeForCycle(cycle, queue.epoch(), false); 109 return (store != null) ? store.file() : null; 110 } 111 112 @Override 113 public void onReleased(int cycle, File file) { 114 if (queue == null || retention.disable()) { 115 return; 116 } 117 if (log.isDebugEnabled()) { 118 log.debug(String.format("Release Chronicle file: %s, cycle: %d", file, cycle)); 119 } 120 if (checkPurge()) { 121 purge(); 122 } 123 } 124 125 protected boolean checkPurge() { 126 // there is no need to purge more than the cycle length (which is duration in ms) 127 if (System.currentTimeMillis() - purgedStamp >= retention.getRollCycle().length()) { 128 return true; 129 } 130 if (log.isDebugEnabled()) { 131 log.debug("Skipping purge already done in within cycle duration: " + purgedStamp); 132 } 133 return false; 134 } 135 136}