001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.apache.commons.io.FileUtils.deleteDirectory; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.METADATA_FILE; 024 025import java.io.Externalizable; 026import java.io.IOException; 027import java.nio.file.Files; 028import java.nio.file.Path; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashSet; 033import java.util.List; 034import java.util.Set; 035import java.util.stream.Collectors; 036import java.util.stream.Stream; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.lib.stream.codec.Codec; 041import org.nuxeo.lib.stream.log.LogConfig; 042import org.nuxeo.lib.stream.log.LogLag; 043import org.nuxeo.lib.stream.log.LogPartition; 044import org.nuxeo.lib.stream.log.LogTailer; 045import org.nuxeo.lib.stream.log.Name; 046import org.nuxeo.lib.stream.log.RebalanceListener; 047import org.nuxeo.lib.stream.log.internals.AbstractLogManager; 048import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 049 050/** 051 * @since 9.3 052 */ 053public class ChronicleLogManager extends AbstractLogManager { 054 private static final Log log = LogFactory.getLog(ChronicleLogManager.class); 055 056 protected final List<ChronicleLogConfig> configs; 057 058 protected final ChronicleLogConfig defaultConfig; 059 060 public ChronicleLogManager(Path basePath) { 061 this(basePath, null); 062 } 063 064 /** 065 * Constructor 066 * 067 * @param basePath the base path. 068 * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the 069 * retention duration expires, the older files are candidates for being purged. The property can be 070 * expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h' 071 * in hours and 'd' in days) 072 */ 073 public ChronicleLogManager(Path basePath, String retentionDuration) { 074 this(Collections.singletonList( 075 new ChronicleLogConfig("unknown", true, Collections.emptyList(), basePath, retentionDuration))); 076 } 077 078 public ChronicleLogManager(List<ChronicleLogConfig> configs) { 079 this.configs = configs; 080 this.defaultConfig = findDefaultConfig(); 081 } 082 083 protected ChronicleLogConfig findDefaultConfig() { 084 List<ChronicleLogConfig> defaultConfigs = configs.stream() 085 .filter(LogConfig::isDefault) 086 .collect(Collectors.toList()); 087 // use the last default config 088 if (defaultConfigs.isEmpty()) { 089 return configs.get(configs.size() - 1); 090 } 091 return defaultConfigs.get(defaultConfigs.size() - 1); 092 } 093 094 protected ChronicleLogConfig getConfig(Name name) { 095 return configs.stream().filter(config -> config.match(name)).findFirst().orElse(defaultConfig); 096 } 097 098 protected ChronicleLogConfig getConfig(Name name, Name group) { 099 return configs.stream().filter(config -> config.match(name, group)).findFirst().orElse(defaultConfig); 100 } 101 102 protected static void deleteQueueBasePath(Path basePath) { 103 try { 104 log.info("Removing Chronicle Queues directory: " + basePath); 105 // Performs a recursive delete if the directory contains only chronicles files 106 try (Stream<Path> paths = Files.list(basePath)) { 107 int count = (int) paths.filter(path -> (path.toFile().isFile() && !isChronicleLogFile(path))).count(); 108 if (count > 0) { 109 String msg = "ChronicleLog basePath: " + basePath 110 + " contains unknown files, please choose another basePath"; 111 log.error(msg); 112 throw new IllegalArgumentException(msg); 113 } 114 } 115 deleteDirectory(basePath.toFile()); 116 } catch (IOException e) { 117 String msg = "Cannot remove Chronicle Queues directory: " + basePath + " " + e.getMessage(); 118 log.error(msg, e); 119 throw new IllegalArgumentException(msg, e); 120 } 121 } 122 123 protected static boolean isChronicleLogFile(Path path) { 124 String filename = path.getFileName().toString(); 125 return filename.endsWith(".cq4") || filename.endsWith(".cq4t") || METADATA_FILE.equals(filename); 126 } 127 128 public String getBasePath() { 129 return defaultConfig.getBasePath().toAbsolutePath().toString(); 130 } 131 132 @Override 133 public boolean exists(Name name) { 134 ChronicleLogConfig config = getConfig(name); 135 try (Stream<Path> paths = Files.list(config.getBasePath().resolve(name.getId()))) { 136 return paths.count() > 0; 137 } catch (IOException e) { 138 return false; 139 } 140 } 141 142 @SuppressWarnings("unchecked") 143 @Override 144 public void create(Name name, int size) { 145 ChronicleLogConfig config = getConfig(name); 146 ChronicleLogAppender.create(config, name, size, NO_CODEC).close(); 147 } 148 149 @Override 150 protected int getSize(Name name) { 151 ChronicleLogConfig config = getConfig(name); 152 return ChronicleLogAppender.partitions(config.getBasePath().resolve(name.getId())); 153 } 154 155 @Override 156 public boolean delete(Name name) { 157 ChronicleLogConfig config = getConfig(name); 158 Path path = config.getBasePath().resolve(name.getId()); 159 if (path.toFile().isDirectory()) { 160 deleteQueueBasePath(path); 161 return true; 162 } 163 return false; 164 } 165 166 @SuppressWarnings("unchecked") 167 protected LogLag getLagForPartition(Name name, int partition, Name group) { 168 ChronicleLogConfig config = getConfig(name); 169 long pos; 170 Path path = config.getBasePath().resolve(name.getId()); 171 if (!ChronicleLogOffsetTracker.exists(path, group)) { 172 pos = 0; 173 } else { 174 try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(path.toString(), partition, 175 group, ChronicleRetentionDuration.disableOf(config.getRetention()))) { 176 pos = offsetTracker.readLastCommittedOffset(); 177 } 178 } 179 try (ChronicleLogAppender<Externalizable> appender = ChronicleLogAppender.openWithoutRetention(config, name, 180 NO_CODEC)) { 181 // this trigger an acquire/release on cycle 182 long end = appender.endOffset(partition); 183 if (pos == 0) { 184 pos = appender.firstOffset(partition); 185 } 186 long lag = appender.countMessages(partition, pos, end); 187 long firstOffset = appender.firstOffset(partition); 188 long endMessages = appender.countMessages(partition, firstOffset, end); 189 return new LogLag(pos, end, lag, endMessages); 190 } 191 } 192 193 @Override 194 public List<LogLag> getLagPerPartition(Name name, Name group) { 195 int size = size(name); 196 List<LogLag> ret = new ArrayList<>(size); 197 for (int i = 0; i < size; i++) { 198 ret.add(getLagForPartition(name, i, group)); 199 } 200 return ret; 201 } 202 203 @Override 204 public String toString() { 205 return "ChronicleLogManager{" + "configs=" + configs + ", defaultConfig=" + defaultConfig + '}'; 206 } 207 208 @Override 209 public List<Name> listAllNames() { 210 Set<Name> names = new HashSet<>(); 211 for (ChronicleLogConfig config : configs) { 212 try (Stream<Path> paths = Files.list(config.getBasePath())) { 213 paths.filter(Files::isDirectory) 214 .map(Path::getFileName) 215 .map(Path::toString) 216 .forEach(name -> names.add(Name.ofId(name))); 217 } catch (IOException e) { 218 log.info("Nothing to list in CQ config: " + config); 219 } catch (IllegalArgumentException e) { 220 log.error("Invalid log name in " + config.getBasePath(), e); 221 throw e; 222 } 223 } 224 return new ArrayList<>(names); 225 } 226 227 @Override 228 public List<Name> listConsumerGroups(Name name) { 229 ChronicleLogConfig config = getConfig(name); 230 Path logRoot = config.getBasePath().resolve(name.getId()); 231 if (!logRoot.toFile().exists()) { 232 throw new IllegalArgumentException("Unknown Log: " + name); 233 } 234 try (Stream<Path> paths = Files.list(logRoot)) { 235 return paths.filter(Files::isDirectory) 236 .map(Path::getFileName) 237 .map(Path::toString) 238 .filter(ChronicleLogOffsetTracker::isOffsetTracker) 239 .map(ChronicleLogOffsetTracker::getGroupFromDirectory) 240 .map(Name::ofId) 241 .collect(Collectors.toList()); 242 } catch (IOException e) { 243 throw new IllegalArgumentException("Cannot access Log: " + name, e); 244 } 245 } 246 247 @Override 248 public <M extends Externalizable> CloseableLogAppender<M> createAppender(Name name, Codec<M> codec) { 249 ChronicleLogConfig config = getConfig(name); 250 return ChronicleLogAppender.open(config, name, codec); 251 } 252 253 @Override 254 @SuppressWarnings("unchecked") 255 protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, Name group, 256 Codec<M> codec) { 257 Collection<ChronicleLogTailer<M>> pTailers = new ArrayList<>(partitions.size()); 258 partitions.forEach(partition -> pTailers.add( 259 (ChronicleLogTailer<M>) ((ChronicleLogAppender<M>) getAppender(partition.name(), codec)).createTailer( 260 partition, group, codec))); 261 if (pTailers.size() == 1) { 262 return pTailers.iterator().next(); 263 } 264 return new ChronicleCompoundLogTailer<>(pTailers, group); 265 } 266 267 @Override 268 protected <M extends Externalizable> LogTailer<M> doSubscribe(Name group, Collection<Name> names, 269 RebalanceListener listener, Codec<M> codec) { 270 throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation"); 271 } 272 273}