001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues; 020 021import net.openhft.chronicle.queue.ChronicleQueue; 022import net.openhft.chronicle.queue.ExcerptAppender; 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025import org.nuxeo.ecm.platform.importer.mqueues.message.Message; 026 027import java.io.File; 028import java.io.IOException; 029import java.nio.file.Files; 030import java.nio.file.Path; 031import java.time.Duration; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Objects; 035import java.util.concurrent.ConcurrentLinkedQueue; 036import java.util.stream.Stream; 037 038import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary; 039import static org.apache.commons.io.FileUtils.deleteDirectory; 040import static org.nuxeo.ecm.platform.importer.mqueues.mqueues.CQTailer.DEFAULT_OFFSET_NAMESPACE; 041 042/** 043 * Chronicle Queue implementation of MQueues. 044 * 045 * Note that for performance reason the class loader assertion are disabled. 046 * 047 * @since 9.1 048 */ 049public class CQMQueues<M extends Message> implements MQueues<M> { 050 private static final Log log = LogFactory.getLog(CQMQueues.class); 051 private static final String QUEUE_PREFIX = "Q-"; 052 private static final int POLL_INTERVAL_MS = 100; 053 054 private final List<ChronicleQueue> queues; 055 private final int nbQueues; 056 private final File basePath; 057 058 // keep track of created tailers to make sure they are closed before the mq 059 private final ConcurrentLinkedQueue<CQTailer<M>> tailers = new ConcurrentLinkedQueue<>(); 060 061 /** 062 * Create a new mqueues. Warning this will ERASE the basePath if not empty. 063 */ 064 public CQMQueues(File basePath, int size) { 065 this(basePath, size, false); 066 } 067 068 /** 069 * Open an existing mqueues. 070 */ 071 public CQMQueues(File basePath) { 072 this(basePath, 0, true); 073 } 074 075 076 @Override 077 public int size() { 078 return nbQueues; 079 } 080 081 @Override 082 public CQOffset append(int queue, M message) { 083 ExcerptAppender appender = queues.get(queue).acquireAppender(); 084 appender.writeDocument(w -> w.write("msg").object(message)); 085 return new CQOffset(queue, appender.lastIndexAppended()); 086 } 087 088 @Override 089 public Tailer<M> createTailer(int queue) { 090 return addTailer(new CQTailer<>(basePath.toString(), queues.get(queue).createTailer(), queue)); 091 } 092 093 @Override 094 public Tailer<M> createTailer(int queue, String name) { 095 return addTailer(new CQTailer<>(basePath.toString(), queues.get(queue).createTailer(), queue, name)); 096 } 097 098 private Tailer<M> addTailer(CQTailer<M> tailer) { 099 tailers.add(tailer); 100 return tailer; 101 } 102 103 @Override 104 public boolean waitFor(Offset offset, Duration timeout) throws InterruptedException { 105 boolean ret; 106 long offsetPosition = ((CQOffset) offset).getOffset(); 107 int queue = ((CQOffset) offset).getQueue(); 108 try (CQOffsetTracker offsetTracker = new CQOffsetTracker(basePath.toString(), queue, DEFAULT_OFFSET_NAMESPACE)) { 109 ret = isProcessed(offsetTracker, offsetPosition); 110 if (ret) { 111 return true; 112 } 113 final long timeoutMs = timeout.toMillis(); 114 final long deadline = System.currentTimeMillis() + timeoutMs; 115 final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs); 116 while (!ret && System.currentTimeMillis() < deadline) { 117 Thread.sleep(delay); 118 ret = isProcessed(offsetTracker, offsetPosition); 119 } 120 } 121 return ret; 122 } 123 124 private boolean isProcessed(CQOffsetTracker tracker, long offset) { 125 long last = tracker.readLastCommittedOffset(); 126 return (last > 0) && (last >= offset); 127 } 128 129 130 @Override 131 public void close() throws Exception { 132 log.debug("Closing queue"); 133 tailers.stream().filter(Objects::nonNull).forEach(tailer -> { 134 try { 135 tailer.close(); 136 } catch (Exception e) { 137 log.error("Failed to close tailer: " + tailer); 138 } 139 }); 140 tailers.clear(); 141 queues.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close); 142 queues.clear(); 143 } 144 145 private CQMQueues(File basePath, int size, boolean append) { 146 if (!append) { 147 resetBasePath(basePath); 148 this.nbQueues = size; 149 } else { 150 this.nbQueues = findNbQueues(basePath); 151 } 152 this.basePath = basePath; 153 queues = new ArrayList<>(this.nbQueues); 154 log.info("Using chronicle queues in: " + basePath); 155 156 for (int i = 0; i < this.nbQueues; i++) { 157 File path = new File(basePath, String.format("%s%02d", QUEUE_PREFIX, i)); 158 ChronicleQueue queue = binary(path).build(); 159 queues.add(queue); 160 // touch the queue so we can count them even if they stay empty. 161 queue.file().mkdirs(); 162 } 163 164 // When manipulating millions of messages java assert must be disable or GC on Chronicle Queues will knock at the door 165 // also this does not work when running test suite, it requires to change the maven-surefire-plugin conf to add a -da option 166 ClassLoader loader = ClassLoader.getSystemClassLoader(); 167 loader.setDefaultAssertionStatus(false); 168 } 169 170 private int findNbQueues(File basePath) { 171 int ret; 172 try (Stream<Path> paths = Files.list(basePath.toPath())) { 173 ret = (int) paths.filter(path -> (Files.isDirectory(path) && path.getFileName().toString().startsWith(QUEUE_PREFIX))).count(); 174 if (ret == 0) { 175 throw new IOException("No chronicles queues file found"); 176 } 177 } catch (IOException e) { 178 throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e); 179 } 180 return ret; 181 } 182 183 private void resetBasePath(File basePath) { 184 if (basePath.isDirectory()) { 185 deleteQueueBasePath(basePath); 186 } 187 if (!basePath.mkdirs()) { 188 String msg = "Can not create Chronicle Queues in: " + basePath; 189 log.error(msg); 190 throw new IllegalArgumentException(msg); 191 } 192 } 193 194 private void deleteQueueBasePath(File basePath) { 195 try { 196 log.info("Removing Chronicle Queues directory: " + basePath); 197 // Performs a recursive delete if the directory contains only chronicles files 198 try (Stream<Path> paths = Files.list(basePath.toPath())) { 199 int count = (int) paths.filter(path -> (Files.isRegularFile(path) && !path.toString().endsWith(".cq4"))).count(); 200 if (count > 0) { 201 String msg = "CQMQueues basePath: " + basePath + " contains unkown files, please choose another basePath"; 202 log.error(msg); 203 throw new IllegalArgumentException(msg); 204 } 205 } 206 deleteDirectory(basePath); 207 } catch (IOException e) { 208 String msg = "Can not remove Chronicle Queues directory: " + basePath + " " + e.getMessage(); 209 log.error(msg, e); 210 throw new IllegalArgumentException(msg, e); 211 } 212 } 213 214 215}