001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import java.io.File; 022import java.io.IOException; 023import java.nio.file.Files; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.NavigableSet; 028 029import org.apache.logging.log4j.LogManager; 030import org.apache.logging.log4j.Logger; 031import org.jetbrains.annotations.Nullable; 032import org.nuxeo.lib.stream.StreamRuntimeException; 033 034import net.openhft.chronicle.queue.impl.StoreFileListener; 035import net.openhft.chronicle.queue.impl.WireStore; 036import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 037import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore; 038 039/** 040 * @since 9.3 041 */ 042public class ChronicleRetentionListener implements StoreFileListener { 043 044 private static final Logger log = LogManager.getLogger(ChronicleRetentionListener.class); 045 046 protected final ChronicleRetentionDuration retention; 047 048 protected SingleChronicleQueue queue; 049 050 protected long purgedStamp; 051 052 public ChronicleRetentionListener(ChronicleRetentionDuration retention) { 053 this.retention = retention; 054 } 055 056 public void setQueue(SingleChronicleQueue queue) { 057 this.queue = queue; 058 } 059 060 @Override 061 public void onAcquired(int cycle, File file) { 062 if (queue == null || retention.disable()) { 063 return; 064 } 065 log.debug("Acquire Chronicle file: {}, cycle: {}", file, cycle); 066 067 } 068 069 public synchronized void purge() { 070 if (queue == null || queue.isClosed() || retention.disable() || !queue.file().exists()) { 071 return; 072 } 073 List<Integer> cycles = getAllCycles(); 074 int cyclesToRemove = cycles.size() - retention.getRetentionCycles(); 075 if (cyclesToRemove <= 0) { 076 return; 077 } 078 purgedStamp = System.currentTimeMillis(); 079 cycles.subList(0, cyclesToRemove).forEach(this::dropCycle); 080 // this is needed to update first cycle, it calls directoryListing.refresh() 081 queue.createTailer().close(); 082 } 083 084 protected void dropCycle(Integer cycle) { 085 SingleChronicleQueueStore store = queue.storeForCycle(cycle, queue.epoch(), false, null); 086 if (store == null) { 087 return; 088 } 089 File file = store.file(); 090 if (file == null || !file.exists()) { 091 return; 092 } 093 log.info("Deleting Chronicle file: {} according to retention: {}", file::getAbsolutePath, () -> retention); 094 try { 095 queue.closeStore(store); 096 Files.delete(file.toPath()); 097 queue.refreshDirectoryListing(); 098 log.debug(file + " deleted"); 099 } catch (IOException | SecurityException e) { 100 log.warn("Unable to delete Chronicle file: {}, {}", file::getAbsolutePath, e::getMessage); 101 } 102 } 103 104 protected List<Integer> getAllCycles() { 105 List<Integer> ret = new ArrayList<>(); 106 try { 107 NavigableSet<Long> allCycles = queue.listCyclesBetween(queue.firstCycle(), queue.lastCycle()); 108 allCycles.iterator().forEachRemaining(cycle -> ret.add(cycle.intValue())); 109 return ret; 110 } catch (ParseException e) { 111 throw new StreamRuntimeException("Fail to list cycles for queue: " + queue, e); 112 } 113 } 114 115 @Override 116 public void onReleased(int cycle, File file) { 117 if (queue == null || queue.isClosed() || retention.disable()) { 118 return; 119 } 120 log.debug("Release Chronicle file: {}, cycle: {}", file, cycle); 121 if (checkPurge()) { 122 purge(); 123 } 124 } 125 126 protected boolean checkPurge() { 127 // there is no need to purge more than the cycle length (which is duration in ms) 128 if (System.currentTimeMillis() - purgedStamp >= retention.getRollCycle().length()) { 129 return true; 130 } 131 log.debug("Skipping purge already done in within cycle duration: {}", purgedStamp); 132 return false; 133 } 134 135}