001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.apache.commons.io.FileUtils.deleteDirectory; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023 024import java.io.Externalizable; 025import java.io.IOException; 026import java.nio.file.Files; 027import java.nio.file.Path; 028import java.util.ArrayList; 029import java.util.Collection; 030import java.util.List; 031import java.util.stream.Collectors; 032import java.util.stream.Stream; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.lib.stream.codec.Codec; 037import org.nuxeo.lib.stream.log.LogLag; 038import org.nuxeo.lib.stream.log.LogPartition; 039import org.nuxeo.lib.stream.log.LogTailer; 040import org.nuxeo.lib.stream.log.RebalanceListener; 041import org.nuxeo.lib.stream.log.internals.AbstractLogManager; 042import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 043 044/** 045 * @since 9.3 046 */ 047public class ChronicleLogManager extends AbstractLogManager { 048 private static final Log log = LogFactory.getLog(ChronicleLogManager.class); 049 050 protected final Path basePath; 051 052 protected final ChronicleRetentionDuration retention; 053 054 public ChronicleLogManager(Path basePath) { 055 this(basePath, null); 056 } 057 058 /** 059 * Constructor 060 * 061 * @param basePath the base path. 062 * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the 063 * retention duration expires, the older files are candidates for being purged. The property can be 064 * expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h' 065 * in hours and 'd' in days) 066 */ 067 public ChronicleLogManager(Path basePath, String retentionDuration) { 068 this.basePath = basePath; 069 this.retention = new ChronicleRetentionDuration(retentionDuration); 070 } 071 072 protected static void deleteQueueBasePath(Path basePath) { 073 try { 074 log.info("Removing Chronicle Queues directory: " + basePath); 075 // Performs a recursive delete if the directory contains only chronicles files 076 try (Stream<Path> paths = Files.list(basePath)) { 077 int count = (int) paths.filter(path -> (path.toFile().isFile() && !path.toString().endsWith(".cq4"))) 078 .count(); 079 if (count > 0) { 080 String msg = "ChronicleLog basePath: " + basePath 081 + " contains unknown files, please choose another basePath"; 082 log.error(msg); 083 throw new IllegalArgumentException(msg); 084 } 085 } 086 deleteDirectory(basePath.toFile()); 087 } catch (IOException e) { 088 String msg = "Cannot remove Chronicle Queues directory: " + basePath + " " + e.getMessage(); 089 log.error(msg, e); 090 throw new IllegalArgumentException(msg, e); 091 } 092 } 093 094 public String getBasePath() { 095 return basePath.toAbsolutePath().toString(); 096 } 097 098 @Override 099 public boolean exists(String name) { 100 try (Stream<Path> paths = Files.list(basePath.resolve(name))) { 101 return paths.count() > 0; 102 } catch (IOException e) { 103 return false; 104 } 105 } 106 107 @SuppressWarnings("unchecked") 108 @Override 109 public void create(String name, int size) { 110 ChronicleLogAppender.create(NO_CODEC, basePath.resolve(name).toFile(), size, retention).close(); 111 } 112 113 @Override 114 protected int getSize(String name) { 115 return ChronicleLogAppender.partitions(basePath.resolve(name).toFile()); 116 } 117 118 @Override 119 public boolean delete(String name) { 120 Path path = basePath.resolve(name); 121 if (path.toFile().isDirectory()) { 122 deleteQueueBasePath(path); 123 return true; 124 } 125 return false; 126 } 127 128 @SuppressWarnings("unchecked") 129 protected LogLag getLagForPartition(String name, int partition, String group) { 130 long pos; 131 Path path = basePath.resolve(name); 132 if (!ChronicleLogOffsetTracker.exists(path, group)) { 133 pos = 0; 134 } else { 135 try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(path.toString(), partition, 136 group)) { 137 pos = offsetTracker.readLastCommittedOffset(); 138 } 139 } 140 try (ChronicleLogAppender<Externalizable> appender = ChronicleLogAppender.open(NO_CODEC, 141 basePath.resolve(name).toFile())) { 142 // this trigger an acquire/release on cycle 143 long end = appender.endOffset(partition); 144 if (pos == 0) { 145 pos = appender.firstOffset(partition); 146 } 147 long lag = appender.countMessages(partition, pos, end); 148 long firstOffset = appender.firstOffset(partition); 149 long endMessages = appender.countMessages(partition, firstOffset, end); 150 return new LogLag(pos, end, lag, endMessages); 151 } 152 } 153 154 @Override 155 public List<LogLag> getLagPerPartition(String name, String group) { 156 int size = size(name); 157 List<LogLag> ret = new ArrayList<>(size); 158 for (int i = 0; i < size; i++) { 159 ret.add(getLagForPartition(name, i, group)); 160 } 161 return ret; 162 } 163 164 @Override 165 public String toString() { 166 return "ChronicleLogManager{" + "basePath=" + basePath + ", retention='" + retention + '\'' + '}'; 167 } 168 169 @Override 170 public List<String> listAll() { 171 try (Stream<Path> paths = Files.list(basePath)) { 172 return paths.filter(Files::isDirectory).map(Path::getFileName).map(Path::toString).collect( 173 Collectors.toList()); 174 } catch (IOException e) { 175 throw new IllegalArgumentException("Invalid base path: " + basePath, e); 176 } 177 } 178 179 @Override 180 public List<String> listConsumerGroups(String name) { 181 Path logRoot = basePath.resolve(name); 182 if (!logRoot.toFile().exists()) { 183 throw new IllegalArgumentException("Unknown Log: " + name); 184 } 185 try (Stream<Path> paths = Files.list(logRoot)) { 186 return paths.filter(Files::isDirectory) 187 .map(Path::getFileName) 188 .map(Path::toString) 189 .filter(ChronicleLogOffsetTracker::isOffsetTracker) 190 .map(ChronicleLogOffsetTracker::getGroupFromDirectory) 191 .collect(Collectors.toList()); 192 } catch (IOException e) { 193 throw new IllegalArgumentException("Cannot access Log: " + name, e); 194 } 195 } 196 197 @Override 198 public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name, Codec<M> codec) { 199 return ChronicleLogAppender.open(codec, basePath.resolve(name).toFile(), retention); 200 } 201 202 @Override 203 @SuppressWarnings("unchecked") 204 protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, String group, 205 Codec<M> codec) { 206 Collection<ChronicleLogTailer<M>> pTailers = new ArrayList<>(partitions.size()); 207 partitions.forEach(partition -> pTailers.add( 208 (ChronicleLogTailer<M>) ((ChronicleLogAppender<M>) getAppender(partition.name(), codec)).createTailer( 209 partition, group, codec))); 210 if (pTailers.size() == 1) { 211 return pTailers.iterator().next(); 212 } 213 return new ChronicleCompoundLogTailer<>(pTailers, group); 214 } 215 216 @Override 217 protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, 218 RebalanceListener listener, Codec<M> codec) { 219 throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation"); 220 221 } 222 223}