001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.apache.commons.io.FileUtils.deleteDirectory; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.METADATA_FILE; 024 025import java.io.Externalizable; 026import java.io.IOException; 027import java.nio.file.Files; 028import java.nio.file.Path; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.List; 032import java.util.stream.Collectors; 033import java.util.stream.Stream; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.lib.stream.codec.Codec; 038import org.nuxeo.lib.stream.log.LogLag; 039import org.nuxeo.lib.stream.log.LogPartition; 040import org.nuxeo.lib.stream.log.LogTailer; 041import org.nuxeo.lib.stream.log.RebalanceListener; 042import org.nuxeo.lib.stream.log.internals.AbstractLogManager; 043import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 044 045/** 046 * @since 9.3 047 */ 048public class ChronicleLogManager extends AbstractLogManager { 049 private static final Log log = LogFactory.getLog(ChronicleLogManager.class); 050 051 protected final Path basePath; 052 053 protected final ChronicleRetentionDuration retention; 054 055 public ChronicleLogManager(Path basePath) { 056 this(basePath, null); 057 } 058 059 /** 060 * Constructor 061 * 062 * @param basePath the base path. 063 * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the 064 * retention duration expires, the older files are candidates for being purged. The property can be 065 * expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h' 066 * in hours and 'd' in days) 067 */ 068 public ChronicleLogManager(Path basePath, String retentionDuration) { 069 this.basePath = basePath; 070 this.retention = new ChronicleRetentionDuration(retentionDuration); 071 } 072 073 protected static void deleteQueueBasePath(Path basePath) { 074 try { 075 log.info("Removing Chronicle Queues directory: " + basePath); 076 // Performs a recursive delete if the directory contains only chronicles files 077 try (Stream<Path> paths = Files.list(basePath)) { 078 int count = (int) paths.filter(path -> (path.toFile().isFile() && !isChronicleLogFile(path))).count(); 079 if (count > 0) { 080 String msg = "ChronicleLog basePath: " + basePath 081 + " contains unknown files, please choose another basePath"; 082 log.error(msg); 083 throw new IllegalArgumentException(msg); 084 } 085 } 086 deleteDirectory(basePath.toFile()); 087 } catch (IOException e) { 088 String msg = "Cannot remove Chronicle Queues directory: " + basePath + " " + e.getMessage(); 089 log.error(msg, e); 090 throw new IllegalArgumentException(msg, e); 091 } 092 } 093 094 protected static boolean isChronicleLogFile(Path path) { 095 String filename = path.getFileName().toString(); 096 return filename.endsWith(".cq4") || filename.endsWith(".cq4t") || METADATA_FILE.equals(filename); 097 } 098 099 public String getBasePath() { 100 return basePath.toAbsolutePath().toString(); 101 } 102 103 @Override 104 public boolean exists(String name) { 105 try (Stream<Path> paths = Files.list(basePath.resolve(name))) { 106 return paths.count() > 0; 107 } catch (IOException e) { 108 return false; 109 } 110 } 111 112 @SuppressWarnings("unchecked") 113 @Override 114 public void create(String name, int size) { 115 ChronicleLogAppender.create(NO_CODEC, basePath.resolve(name).toFile(), size, retention).close(); 116 } 117 118 @Override 119 protected int getSize(String name) { 120 return ChronicleLogAppender.partitions(basePath.resolve(name)); 121 } 122 123 @Override 124 public boolean delete(String name) { 125 Path path = basePath.resolve(name); 126 if (path.toFile().isDirectory()) { 127 deleteQueueBasePath(path); 128 return true; 129 } 130 return false; 131 } 132 133 @SuppressWarnings("unchecked") 134 protected LogLag getLagForPartition(String name, int partition, String group) { 135 long pos; 136 Path path = basePath.resolve(name); 137 if (!ChronicleLogOffsetTracker.exists(path, group)) { 138 pos = 0; 139 } else { 140 try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(path.toString(), partition, 141 group, ChronicleRetentionDuration.disableOf(retention))) { 142 pos = offsetTracker.readLastCommittedOffset(); 143 } 144 } 145 try (ChronicleLogAppender<Externalizable> appender = ChronicleLogAppender.open(NO_CODEC, 146 basePath.resolve(name).toFile())) { 147 // this trigger an acquire/release on cycle 148 long end = appender.endOffset(partition); 149 if (pos == 0) { 150 pos = appender.firstOffset(partition); 151 } 152 long lag = appender.countMessages(partition, pos, end); 153 long firstOffset = appender.firstOffset(partition); 154 long endMessages = appender.countMessages(partition, firstOffset, end); 155 return new LogLag(pos, end, lag, endMessages); 156 } 157 } 158 159 @Override 160 public List<LogLag> getLagPerPartition(String name, String group) { 161 int size = size(name); 162 List<LogLag> ret = new ArrayList<>(size); 163 for (int i = 0; i < size; i++) { 164 ret.add(getLagForPartition(name, i, group)); 165 } 166 return ret; 167 } 168 169 @Override 170 public String toString() { 171 return "ChronicleLogManager{" + "basePath=" + basePath + ", retention='" + retention + '\'' + '}'; 172 } 173 174 @Override 175 public List<String> listAll() { 176 try (Stream<Path> paths = Files.list(basePath)) { 177 return paths.filter(Files::isDirectory) 178 .map(Path::getFileName) 179 .map(Path::toString) 180 .collect(Collectors.toList()); 181 } catch (IOException e) { 182 throw new IllegalArgumentException("Invalid base path: " + basePath, e); 183 } 184 } 185 186 @Override 187 public List<String> listConsumerGroups(String name) { 188 Path logRoot = basePath.resolve(name); 189 if (!logRoot.toFile().exists()) { 190 throw new IllegalArgumentException("Unknown Log: " + name); 191 } 192 try (Stream<Path> paths = Files.list(logRoot)) { 193 return paths.filter(Files::isDirectory) 194 .map(Path::getFileName) 195 .map(Path::toString) 196 .filter(ChronicleLogOffsetTracker::isOffsetTracker) 197 .map(ChronicleLogOffsetTracker::getGroupFromDirectory) 198 .collect(Collectors.toList()); 199 } catch (IOException e) { 200 throw new IllegalArgumentException("Cannot access Log: " + name, e); 201 } 202 } 203 204 @Override 205 public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name, Codec<M> codec) { 206 return ChronicleLogAppender.open(codec, basePath.resolve(name).toFile(), retention); 207 } 208 209 @Override 210 @SuppressWarnings("unchecked") 211 protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, String group, 212 Codec<M> codec) { 213 Collection<ChronicleLogTailer<M>> pTailers = new ArrayList<>(partitions.size()); 214 partitions.forEach(partition -> pTailers.add( 215 (ChronicleLogTailer<M>) ((ChronicleLogAppender<M>) getAppender(partition.name(), codec)).createTailer( 216 partition, group, codec))); 217 if (pTailers.size() == 1) { 218 return pTailers.iterator().next(); 219 } 220 return new ChronicleCompoundLogTailer<>(pTailers, group); 221 } 222 223 @Override 224 protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, 225 RebalanceListener listener, Codec<M> codec) { 226 throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation"); 227 228 } 229 230}