001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.chronicle; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.io.Externalizable; 024import java.io.File; 025import java.io.IOException; 026import java.nio.file.Files; 027import java.nio.file.Path; 028import java.time.Duration; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.Objects; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.stream.Stream; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.lib.stream.codec.Codec; 038import org.nuxeo.lib.stream.log.LogOffset; 039import org.nuxeo.lib.stream.log.LogPartition; 040import org.nuxeo.lib.stream.log.LogTailer; 041import org.nuxeo.lib.stream.log.internals.CloseableLogAppender; 042import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 043 044import net.openhft.chronicle.queue.ChronicleQueue; 045import net.openhft.chronicle.queue.ExcerptAppender; 046import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 047import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 048 049/** 050 * Chronicle Queue implementation of LogAppender. 051 * 052 * @since 9.3 053 */ 054public class ChronicleLogAppender<M extends Externalizable> implements CloseableLogAppender<M> { 055 private static final Log log = LogFactory.getLog(ChronicleLogAppender.class); 056 057 protected static final String PARTITION_PREFIX = "P-"; 058 059 protected static final int POLL_INTERVAL_MS = 100; 060 061 protected static final int MAX_PARTITIONS = 100; 062 063 public static final String MSG_KEY = "msg"; 064 065 protected final List<ChronicleQueue> partitions; 066 067 protected final int nbPartitions; 068 069 protected final File basePath; 070 071 protected final String name; 072 073 // keep track of created tailers to make sure they are closed before the log 074 protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 075 076 protected final ChronicleRetentionDuration retention; 077 078 protected final Codec<M> codec; 079 080 protected volatile boolean closed; 081 082 protected ChronicleLogAppender(Codec<M> codec, File basePath, int size, ChronicleRetentionDuration retention) { 083 if (size == 0) { 084 // open 085 if (!exists(basePath)) { 086 throw new IllegalArgumentException("Cannot open Chronicle Queues, invalid path: " + basePath); 087 } 088 this.nbPartitions = partitions(basePath); 089 } else { 090 // create 091 if (size > MAX_PARTITIONS) { 092 throw new IllegalArgumentException( 093 String.format("Cannot create more than: %d partitions for log: %s, requested: %d", 094 MAX_PARTITIONS, basePath, size)); 095 } 096 if (exists(basePath)) { 097 throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + basePath); 098 } 099 if (!basePath.exists() && !basePath.mkdirs()) { 100 throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + basePath); 101 } 102 this.nbPartitions = size; 103 } 104 Objects.requireNonNull(codec); 105 this.codec = codec; 106 this.name = basePath.getName(); 107 this.basePath = basePath; 108 this.retention = retention; 109 partitions = new ArrayList<>(this.nbPartitions); 110 if (log.isDebugEnabled()) { 111 log.debug(((size == 0) ? "Opening: " : "Creating: ") + toString()); 112 } 113 initPartitions(); 114 } 115 116 protected void initPartitions() { 117 for (int i = 0; i < nbPartitions; i++) { 118 File path = new File(basePath, String.format("%s%02d", PARTITION_PREFIX, i)); 119 if (retention.disable()) { 120 partitions.add(SingleChronicleQueueBuilder.binary(path).build()); 121 } else { 122 ChronicleRetentionListener listener = new ChronicleRetentionListener(retention); 123 SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(path) 124 .rollCycle(retention.getRollCycle()) 125 .storeFileListener(listener) 126 .build(); 127 listener.setQueue(queue); 128 partitions.add(queue); 129 } 130 try { 131 // make sure the directory is created so we can count the partitions 132 Files.createDirectories(path.toPath()); 133 } catch (IOException e) { 134 throw new IllegalArgumentException("Cannot create directory: " + path.getAbsolutePath(), e); 135 } 136 } 137 } 138 139 protected static boolean exists(File basePath) { 140 // noinspection ConstantConditions 141 return basePath.isDirectory() && basePath.list().length > 0; 142 } 143 144 /** 145 * Create a new log 146 */ 147 public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File basePath, int size, 148 ChronicleRetentionDuration retention) { 149 return new ChronicleLogAppender<>(codec, basePath, size, retention); 150 } 151 152 /** 153 * Create a new log. 154 */ 155 public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File basePath, int size) { 156 return new ChronicleLogAppender<>(codec, basePath, size, ChronicleRetentionDuration.DISABLE); 157 } 158 159 /** 160 * Open an existing log. 161 */ 162 public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File basePath) { 163 return new ChronicleLogAppender<>(codec, basePath, 0, ChronicleRetentionDuration.DISABLE); 164 } 165 166 /** 167 * Open an existing log. 168 */ 169 public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File basePath, 170 ChronicleRetentionDuration retention) { 171 return new ChronicleLogAppender<>(codec, basePath, 0, retention); 172 } 173 174 public String getBasePath() { 175 return basePath.getPath(); 176 } 177 178 @Override 179 public String name() { 180 return name; 181 } 182 183 @Override 184 public int size() { 185 return nbPartitions; 186 } 187 188 @Override 189 public LogOffset append(int partition, M message) { 190 ExcerptAppender appender = partitions.get(partition).acquireAppender(); 191 if (NO_CODEC.equals(codec)) { 192 // default format for backward compatibility 193 appender.writeDocument(w -> w.write(MSG_KEY).object(message)); 194 } else { 195 appender.writeDocument(w -> w.write().bytes(codec.encode(message))); 196 } 197 long offset = appender.lastIndexAppended(); 198 LogOffset ret = new LogOffsetImpl(name, partition, offset); 199 if (log.isDebugEnabled()) { 200 log.debug(String.format("append to %s, value: %s", ret, message)); 201 } 202 return ret; 203 } 204 205 public LogTailer<M> createTailer(LogPartition partition, String group, Codec<M> codec) { 206 return addTailer(new ChronicleLogTailer<>(codec, basePath.toString(), 207 partitions.get(partition.partition()).createTailer(), partition, group, retention)); 208 } 209 210 public long endOffset(int partition) { 211 return partitions.get(partition).createTailer().toEnd().index(); 212 } 213 214 public long firstOffset(int partition) { 215 long ret = partitions.get(partition).firstIndex(); 216 if (ret == Long.MAX_VALUE) { 217 return 0; 218 } 219 return ret; 220 } 221 222 public long countMessages(int partition, long lowerOffset, long upperOffset) { 223 long ret; 224 SingleChronicleQueue queue = (SingleChronicleQueue) partitions.get(partition); 225 try { 226 ret = queue.countExcerpts(lowerOffset, upperOffset); 227 } catch (IllegalStateException e) { 228 if (log.isDebugEnabled()) { 229 log.debug("Missing low cycle file: " + lowerOffset + " for queue: " + queue + " " + e.getMessage()); 230 } 231 return 0; 232 } 233 return ret; 234 } 235 236 protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer) { 237 tailers.add(tailer); 238 return tailer; 239 } 240 241 @Override 242 public boolean waitFor(LogOffset offset, String group, Duration timeout) throws InterruptedException { 243 boolean ret; 244 long offsetPosition = offset.offset(); 245 int partition = offset.partition().partition(); 246 try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(basePath.toString(), partition, 247 group)) { 248 ret = isProcessed(offsetTracker, offsetPosition); 249 if (ret) { 250 return true; 251 } 252 final long timeoutMs = timeout.toMillis(); 253 final long deadline = System.currentTimeMillis() + timeoutMs; 254 final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 255 while (!ret && System.currentTimeMillis() < deadline) { 256 Thread.sleep(delay); 257 ret = isProcessed(offsetTracker, offsetPosition); 258 } 259 } 260 return ret; 261 } 262 263 @Override 264 public boolean closed() { 265 return closed; 266 } 267 268 @Override 269 public Codec<M> getCodec() { 270 return codec; 271 } 272 273 protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset) { 274 long last = tracker.readLastCommittedOffset(); 275 return last > 0 && last >= offset; 276 } 277 278 @Override 279 public void close() { 280 log.debug("Closing: " + toString()); 281 tailers.stream().filter(Objects::nonNull).forEach(ChronicleLogTailer::close); 282 tailers.clear(); 283 partitions.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close); 284 partitions.clear(); 285 closed = true; 286 } 287 288 public static int partitions(File basePath) { 289 int ret; 290 try (Stream<Path> paths = Files.list(basePath.toPath())) { 291 ret = (int) paths.filter( 292 path -> (path.toFile().isDirectory() && path.getFileName().toString().startsWith(PARTITION_PREFIX))) 293 .count(); 294 if (ret == 0) { 295 throw new IOException("No chronicles queues file found"); 296 } 297 } catch (IOException e) { 298 throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e); 299 } 300 return ret; 301 } 302 303 @Override 304 public String toString() { 305 return "ChronicleLogAppender{" + "nbPartitions=" + nbPartitions + ", basePath=" + basePath + ", name='" + name 306 + '\'' + ", retention=" + retention + ", closed=" + closed + ", codec=" + codec + '}'; 307 } 308 309 public ChronicleRetentionDuration getRetention() { 310 return retention; 311 } 312}