001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import java.io.File; 022import java.io.IOException; 023import java.nio.file.Files; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.NavigableSet; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.lib.stream.StreamRuntimeException; 032 033import net.openhft.chronicle.queue.impl.StoreFileListener; 034import net.openhft.chronicle.queue.impl.WireStore; 035import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 036 037/** 038 * @since 9.3 039 */ 040public class ChronicleRetentionListener implements StoreFileListener { 041 private static final Log log = LogFactory.getLog(ChronicleRetentionListener.class); 042 043 protected final ChronicleRetentionDuration retention; 044 045 protected SingleChronicleQueue queue; 046 047 protected long purgedStamp; 048 049 public ChronicleRetentionListener(ChronicleRetentionDuration retention) { 050 this.retention = retention; 051 } 052 053 public void setQueue(SingleChronicleQueue queue) { 054 this.queue = queue; 055 } 056 057 @Override 058 public void onAcquired(int cycle, File file) { 059 if (queue == null || retention.disable()) { 060 return; 061 } 062 if (log.isDebugEnabled()) { 063 log.debug(String.format("Acquire Chronicle file: %s, cycle: %s", file, cycle)); 064 } 065 } 066 067 public synchronized void purge() { 068 if (queue == null || queue.isClosed() || retention.disable() || !queue.file().exists()) { 069 return; 070 } 071 List<Integer> cycles = getAllCycles(); 072 int cyclesToRemove = cycles.size() - retention.getRetentionCycles(); 073 if (cyclesToRemove <= 0) { 074 return; 075 } 076 purgedStamp = System.currentTimeMillis(); 077 cycles.subList(0, cyclesToRemove).forEach(this::dropCycle); 078 // this is needed to update first cycle, it calls directoryListing.refresh() 079 queue.createTailer(); 080 } 081 082 protected void dropCycle(Integer cycle) { 083 File file = getFileForCycle(cycle); 084 if (!file.exists()) { 085 return; 086 } 087 log.info(String.format("Deleting Chronicle file: %s according to retention: %s", file.getAbsolutePath(), 088 retention)); 089 try { 090 Files.delete(file.toPath()); 091 queue.refreshDirectlyListing(); 092 log.debug(file + " deleted"); 093 } catch (IOException | SecurityException e) { 094 log.warn(String.format("Unable to delete Chronicle file: %s, %s", file.getAbsolutePath(), e.getMessage())); 095 } 096 } 097 098 protected List<Integer> getAllCycles() { 099 List<Integer> ret = new ArrayList<>(); 100 try { 101 NavigableSet<Long> allCycles = queue.listCyclesBetween(queue.firstCycle(), queue.lastCycle()); 102 allCycles.iterator().forEachRemaining(cycle -> ret.add(cycle.intValue())); 103 return ret; 104 } catch (ParseException e) { 105 throw new StreamRuntimeException("Fail to list cycles for queue: " + queue, e); 106 } 107 } 108 109 protected File getFileForCycle(int cycle) { 110 WireStore store = queue.storeForCycle(cycle, queue.epoch(), false); 111 return (store != null) ? store.file() : null; 112 } 113 114 @Override 115 public void onReleased(int cycle, File file) { 116 if (queue == null || queue.isClosed() || retention.disable()) { 117 return; 118 } 119 if (log.isDebugEnabled()) { 120 log.debug(String.format("Release Chronicle file: %s, cycle: %d", file, cycle)); 121 } 122 if (checkPurge()) { 123 purge(); 124 } 125 } 126 127 protected boolean checkPurge() { 128 // there is no need to purge more than the cycle length (which is duration in ms) 129 if (System.currentTimeMillis() - purgedStamp >= retention.getRollCycle().length()) { 130 return true; 131 } 132 if (log.isDebugEnabled()) { 133 log.debug("Skipping purge already done in within cycle duration: " + purgedStamp); 134 } 135 return false; 136 } 137 138}