001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.apache.commons.io.FileUtils.deleteDirectory; 022 023import java.io.Externalizable; 024import java.io.IOException; 025import java.nio.file.Files; 026import java.nio.file.Path; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.List; 030import java.util.stream.Collectors; 031import java.util.stream.Stream; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.lib.stream.log.LogLag; 036import org.nuxeo.lib.stream.log.LogPartition; 037import org.nuxeo.lib.stream.log.LogTailer; 038import org.nuxeo.lib.stream.log.RebalanceListener; 039import org.nuxeo.lib.stream.log.internals.AbstractLogManager; 040import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 041 042/** 043 * @since 9.3 044 */ 045public class ChronicleLogManager extends AbstractLogManager { 046 private static final Log log = LogFactory.getLog(ChronicleLogManager.class); 047 048 protected final Path basePath; 049 050 protected final ChronicleRetentionDuration retention; 051 052 public ChronicleLogManager(Path basePath) { 053 this(basePath, null); 054 } 055 056 /** 057 * Constructor 058 * 059 * @param basePath the base path. 060 * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the 061 * retention duration expires, the older files are candidates for being purged. The property can be 062 * expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h' 063 * in hours and 'd' in days) 064 */ 065 public ChronicleLogManager(Path basePath, String retentionDuration) { 066 this.basePath = basePath; 067 this.retention = new ChronicleRetentionDuration(retentionDuration); 068 } 069 070 protected static void deleteQueueBasePath(Path basePath) { 071 try { 072 log.info("Removing Chronicle Queues directory: " + basePath); 073 // Performs a recursive delete if the directory contains only chronicles files 074 try (Stream<Path> paths = Files.list(basePath)) { 075 int count = (int) paths.filter(path -> (Files.isRegularFile(path) && !path.toString().endsWith(".cq4"))) 076 .count(); 077 if (count > 0) { 078 String msg = "ChronicleLog basePath: " + basePath 079 + " contains unknown files, please choose another basePath"; 080 log.error(msg); 081 throw new IllegalArgumentException(msg); 082 } 083 } 084 deleteDirectory(basePath.toFile()); 085 } catch (IOException e) { 086 String msg = "Cannot remove Chronicle Queues directory: " + basePath + " " + e.getMessage(); 087 log.error(msg, e); 088 throw new IllegalArgumentException(msg, e); 089 } 090 } 091 092 public String getBasePath() { 093 return basePath.toAbsolutePath().toString(); 094 } 095 096 @Override 097 public boolean exists(String name) { 098 try { 099 return Files.list(basePath.resolve(name)).count() > 0; 100 } catch (IOException e) { 101 return false; 102 } 103 } 104 105 @Override 106 public void create(String name, int size) { 107 ChronicleLogAppender.create(basePath.resolve(name).toFile(), size, retention).close(); 108 } 109 110 @Override 111 public boolean delete(String name) { 112 Path path = basePath.resolve(name); 113 if (Files.isDirectory(path)) { 114 deleteQueueBasePath(path); 115 return true; 116 } 117 return false; 118 } 119 120 protected LogLag getLagForPartition(String name, int partition, String group) { 121 long pos; 122 Path path = basePath.resolve(name); 123 ChronicleLogAppender appender = (ChronicleLogAppender) getAppender(name); 124 if (!ChronicleLogOffsetTracker.exists(path, group)) { 125 pos = 0; 126 } else { 127 try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(path.toString(), partition, 128 group)) { 129 pos = offsetTracker.readLastCommittedOffset(); 130 } 131 } 132 // this trigger an acquire/release on cycle 133 long end = appender.endOffset(partition); 134 if (pos == 0) { 135 pos = appender.firstOffset(partition); 136 } 137 long lag = appender.countMessages(partition, pos, end); 138 long firstOffset = appender.firstOffset(partition); 139 long endMessages = appender.countMessages(partition, firstOffset, end); 140 return new LogLag(pos, end, lag, endMessages); 141 } 142 143 @Override 144 public List<LogLag> getLagPerPartition(String name, String group) { 145 int size = getAppender(name).size(); 146 List<LogLag> ret = new ArrayList<>(size); 147 for (int i = 0; i < size; i++) { 148 ret.add(getLagForPartition(name, i, group)); 149 } 150 return ret; 151 } 152 153 @Override 154 public String toString() { 155 return "ChronicleLogManager{" + "basePath=" + basePath + ", retention='" + retention + '\'' + '}'; 156 } 157 158 @Override 159 public List<String> listAll() { 160 try { 161 return Files.list(basePath).filter(Files::isDirectory).map(Path::getFileName).map(Path::toString).collect( 162 Collectors.toList()); 163 } catch (IOException e) { 164 throw new IllegalArgumentException("Invalid base path: " + basePath, e); 165 } 166 } 167 168 @Override 169 public List<String> listConsumerGroups(String name) { 170 Path logRoot = basePath.resolve(name); 171 if (!Files.exists(logRoot)) { 172 throw new IllegalArgumentException("Unknown Log: " + name); 173 } 174 try { 175 return Files.list(logRoot) 176 .filter(Files::isDirectory) 177 .map(Path::getFileName) 178 .map(Path::toString) 179 .filter(ChronicleLogOffsetTracker::isOffsetTracker) 180 .map(ChronicleLogOffsetTracker::getGroupFromDirectory) 181 .collect(Collectors.toList()); 182 } catch (IOException e) { 183 throw new IllegalArgumentException("Cannot access Log: " + name, e); 184 } 185 } 186 187 @Override 188 public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name) { 189 return ChronicleLogAppender.open(basePath.resolve(name).toFile(), retention); 190 } 191 192 @Override 193 @SuppressWarnings("unchecked") 194 protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, 195 String group) { 196 Collection<ChronicleLogTailer<M>> pTailers = new ArrayList<>(partitions.size()); 197 partitions.forEach(partition -> pTailers.add( 198 (ChronicleLogTailer<M>) ((ChronicleLogAppender<M>) getAppender(partition.name())).createTailer( 199 partition, group))); 200 if (pTailers.size() == 1) { 201 return pTailers.iterator().next(); 202 } 203 return new ChronicleCompoundLogTailer<>(pTailers, group); 204 } 205 206 @Override 207 protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, 208 RebalanceListener listener) { 209 throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation"); 210 211 } 212 213}