001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle; 020 021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.SUFFIX; 022import static org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle.ChronicleMQManager.DEFAULT_RETENTION_DURATION; 023 024import net.openhft.chronicle.queue.ChronicleQueue; 025import net.openhft.chronicle.queue.ExcerptAppender; 026import net.openhft.chronicle.queue.RollCycle; 027import net.openhft.chronicle.queue.RollCycles; 028import net.openhft.chronicle.queue.impl.RollingResourcesCache; 029import net.openhft.chronicle.queue.impl.StoreFileListener; 030import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; 031import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 032import org.apache.commons.io.FilenameUtils; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender; 036import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset; 037import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 038import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer; 039import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQOffsetImpl; 040 041import java.io.Externalizable; 042import java.io.File; 043import java.io.IOException; 044import java.nio.file.Files; 045import java.nio.file.Path; 046import java.time.Duration; 047import java.util.ArrayList; 048import java.util.Arrays; 049import java.util.Comparator; 050import java.util.List; 051import java.util.Objects; 052import java.util.concurrent.ConcurrentLinkedQueue; 053import java.util.stream.Stream; 054 055/** 056 * Chronicle Queue implementation of MQAppender. 057 * 058 * Note that for performance reason the class loader assertion are disabled. 059 * 060 * @since 9.1 061 */ 062public class ChronicleMQAppender<M extends Externalizable> implements MQAppender<M>, StoreFileListener { 063 private static final Log log = LogFactory.getLog(ChronicleMQAppender.class); 064 private static final String QUEUE_PREFIX = "Q-"; 065 private static final int POLL_INTERVAL_MS = 100; 066 067 private static final String SECOND_ROLLING_PERIOD = "s"; 068 069 private static final String MINUTE_ROLLING_PERIOD = "m"; 070 071 private static final String HOUR_ROLLING_PERIOD = "h"; 072 073 private static final String DAY_ROLLING_PERIOD = "d"; 074 075 private final List<ChronicleQueue> queues; 076 private final int nbQueues; 077 private final File basePath; 078 private final String name; 079 080 private int retentionNbCycles; 081 082 // keep track of created tailers to make sure they are closed before the mq 083 private final ConcurrentLinkedQueue<ChronicleMQTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 084 private boolean closed = false; 085 086 static public boolean exists(File basePath) { 087 return basePath.isDirectory() && basePath.list().length > 0; 088 } 089 090 public String getBasePath() { 091 return basePath.getPath(); 092 } 093 094 /** 095 * Create a new mqueues 096 */ 097 static public <M extends Externalizable> ChronicleMQAppender<M> create(File basePath, int size, 098 String retentionPolicy) { 099 return new ChronicleMQAppender<>(basePath, size, retentionPolicy); 100 } 101 102 /** 103 * Create a new mqueues. 104 */ 105 static public <M extends Externalizable> ChronicleMQAppender<M> create(File basePath, int size) { 106 return new ChronicleMQAppender<>(basePath, size, DEFAULT_RETENTION_DURATION); 107 } 108 109 /** 110 * Open an existing mqueues. 111 */ 112 static public <M extends Externalizable> ChronicleMQAppender<M> open(File basePath) { 113 return new ChronicleMQAppender<>(basePath, 0, DEFAULT_RETENTION_DURATION); 114 } 115 116 /** 117 * Open an existing mqueues. 118 */ 119 static public <M extends Externalizable> ChronicleMQAppender<M> open(File basePath, String retentionDuration) { 120 return new ChronicleMQAppender<>(basePath, 0, retentionDuration); 121 } 122 123 @Override 124 public String name() { 125 return name; 126 } 127 128 @Override 129 public int size() { 130 return nbQueues; 131 } 132 133 @Override 134 public MQOffset append(int partition, M message) { 135 ExcerptAppender appender = queues.get(partition).acquireAppender(); 136 appender.writeDocument(w -> w.write("msg").object(message)); 137 long offset = appender.lastIndexAppended(); 138 MQOffset ret = new MQOffsetImpl(name, partition, offset); 139 if (log.isDebugEnabled()) { 140 log.debug(String.format("append to %s, value: %s", ret, message)); 141 } 142 return ret; 143 } 144 145 public MQTailer<M> createTailer(MQPartition partition, String group) { 146 return addTailer(new ChronicleMQTailer<>(basePath.toString(), 147 queues.get(partition.partition()).createTailer(), partition, group)); 148 } 149 150 private MQTailer<M> addTailer(ChronicleMQTailer<M> tailer) { 151 tailers.add(tailer); 152 return tailer; 153 } 154 155 @Override 156 public boolean waitFor(MQOffset offset, String group, Duration timeout) throws InterruptedException { 157 boolean ret; 158 long offsetPosition = offset.offset(); 159 int partition = offset.partition().partition(); 160 try (ChronicleMQOffsetTracker offsetTracker = new ChronicleMQOffsetTracker(basePath.toString(), partition, group)) { 161 ret = isProcessed(offsetTracker, offsetPosition); 162 if (ret) { 163 return true; 164 } 165 final long timeoutMs = timeout.toMillis(); 166 final long deadline = System.currentTimeMillis() + timeoutMs; 167 final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 168 while (!ret && System.currentTimeMillis() < deadline) { 169 Thread.sleep(delay); 170 ret = isProcessed(offsetTracker, offsetPosition); 171 } 172 } 173 return ret; 174 } 175 176 @Override 177 public boolean closed() { 178 return closed; 179 } 180 181 private boolean isProcessed(ChronicleMQOffsetTracker tracker, long offset) { 182 long last = tracker.readLastCommittedOffset(); 183 return (last > 0) && (last >= offset); 184 } 185 186 187 @Override 188 public void close() throws Exception { 189 log.debug("Closing queue"); 190 tailers.stream().filter(Objects::nonNull).forEach(tailer -> { 191 try { 192 tailer.close(); 193 } catch (Exception e) { 194 log.error("Failed to close tailer: " + tailer); 195 } 196 }); 197 tailers.clear(); 198 queues.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close); 199 queues.clear(); 200 closed = true; 201 } 202 203 private ChronicleMQAppender(File basePath, int size, String retentionDuration) { 204 if (size == 0) { 205 // open 206 if (!exists(basePath)) { 207 String msg = "Can not open Chronicle Queues, invalid path: " + basePath; 208 log.error(msg); 209 throw new IllegalArgumentException(msg); 210 } 211 this.nbQueues = findNbQueues(basePath); 212 } else { 213 // creation 214 if (exists(basePath)) { 215 String msg = "Can not create Chronicle Queues, already exists: " + basePath; 216 log.error(msg); 217 throw new IllegalArgumentException(msg); 218 } 219 if (!basePath.exists() && !basePath.mkdirs()) { 220 String msg = "Can not create Chronicle Queues in: " + basePath; 221 log.error(msg); 222 throw new IllegalArgumentException(msg); 223 } 224 this.nbQueues = size; 225 } 226 this.name = basePath.getName(); 227 this.basePath = basePath; 228 229 if (retentionDuration != null) { 230 retentionNbCycles = Integer.valueOf(retentionDuration.substring(0, retentionDuration.length() - 1)); 231 } 232 233 RollCycle rollCycle = getRollCycle(retentionDuration); 234 235 queues = new ArrayList<>(this.nbQueues); 236 log.debug(String.format("%s chronicle mqueue: %s, path: %s, size: %d", 237 (size == 0) ? "Opening" : "Creating", name, basePath, nbQueues)); 238 239 for (int i = 0; i < nbQueues; i++) { 240 File path = new File(basePath, String.format("%s%02d", QUEUE_PREFIX, i)); 241 ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path) 242 .rollCycle(rollCycle) 243 .storeFileListener(this) 244 .build(); 245 queues.add(queue); 246 // touch the queue so we can count them even if they stay empty. 247 queue.file().mkdirs(); 248 } 249 250 // When manipulating millions of messages java assert must be disable or GC on Chronicle Queues will knock at the door 251 // also this does not work when running test suite, it requires to change the maven-surefire-plugin conf to add a -da option 252 ClassLoader loader = ClassLoader.getSystemClassLoader(); 253 loader.setDefaultAssertionStatus(false); 254 } 255 256 private int findNbQueues(File basePath) { 257 int ret; 258 try (Stream<Path> paths = Files.list(basePath.toPath())) { 259 ret = (int) paths.filter(path -> (Files.isDirectory(path) && path.getFileName().toString().startsWith(QUEUE_PREFIX))).count(); 260 if (ret == 0) { 261 throw new IOException("No chronicles queues file found"); 262 } 263 } catch (IOException e) { 264 throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e); 265 } 266 return ret; 267 } 268 269 private RollCycle getRollCycle(String retentionDuration) { 270 String rollingPeriod = retentionDuration.substring(retentionDuration.length() - 1); 271 RollCycle rollCycle; 272 switch (rollingPeriod) { 273 case SECOND_ROLLING_PERIOD: 274 rollCycle = RollCycles.TEST_SECONDLY; 275 break; 276 case MINUTE_ROLLING_PERIOD: 277 rollCycle = RollCycles.MINUTELY; 278 break; 279 case HOUR_ROLLING_PERIOD: 280 rollCycle = RollCycles.HOURLY; 281 break; 282 case DAY_ROLLING_PERIOD: 283 rollCycle = RollCycles.DAILY; 284 break; 285 default: 286 String msg = "Unknown rolling period: " + rollingPeriod + " for MQueue: " + name(); 287 log.error(msg); 288 throw new IllegalArgumentException(msg); 289 } 290 return rollCycle; 291 } 292 293 private int findQueueIndex(File queueFile) { 294 String queueDirName = queueFile.getParentFile().getName(); 295 return Integer.valueOf(queueDirName.substring(queueDirName.length() - 2)); 296 } 297 298 @Override 299 public void onAcquired(int cycle, File file) { 300 if (log.isDebugEnabled()) { 301 log.debug("New file created: " + file + " on cycle: " + cycle); 302 } 303 304 SingleChronicleQueue queue = (SingleChronicleQueue) queues.get(findQueueIndex(file)); 305 306 int lowerCycle = queue.firstCycle(); 307 int upperCycle = cycle - retentionNbCycles; 308 309 purgeQueue(lowerCycle, upperCycle, queue); 310 311 } 312 313 /** 314 * Files in queue older than the current date minus the retention duration are candidates for purging, knowing that 315 * the more recent files should be kept to ensure no data loss (for example after an interruption longer than the 316 * retention duration). 317 */ 318 protected void purgeQueue(int lowerCycle, int upperCycle, SingleChronicleQueue queue) { 319 // TODO this method should be refactored after chronicle-queue lib upgrade 320 File[] files = queue.file().listFiles(); 321 322 if (files != null && lowerCycle < upperCycle) { 323 RollingResourcesCache cache = new RollingResourcesCache(queue.rollCycle(), queue.epoch(), 324 name -> new File(queue.file().getAbsolutePath(), name + SUFFIX), 325 f -> FilenameUtils.removeExtension(f.getName())); 326 327 Arrays.stream(files) 328 .sorted(Comparator.comparingLong(cache::toLong)) // Order files by cycles 329 .limit(files.length - retentionNbCycles) // Keep the 'retentionNbCycles' more recent files 330 .filter(f -> cache.parseCount(FilenameUtils.removeExtension(f.getName())) < upperCycle) 331 .forEach(f -> { 332 if (f.delete()) { 333 log.info("Queue file deleted: " + f.getAbsolutePath()); 334 } 335 }); 336 } 337 } 338 339 @Override 340 public void onReleased(int cycle, File file) { 341 342 } 343}