001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle;
020
021import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.SUFFIX;
022import static org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle.ChronicleMQManager.DEFAULT_RETENTION_DURATION;
023
024import net.openhft.chronicle.queue.ChronicleQueue;
025import net.openhft.chronicle.queue.ExcerptAppender;
026import net.openhft.chronicle.queue.RollCycle;
027import net.openhft.chronicle.queue.RollCycles;
028import net.openhft.chronicle.queue.impl.RollingResourcesCache;
029import net.openhft.chronicle.queue.impl.StoreFileListener;
030import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
031import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
032import org.apache.commons.io.FilenameUtils;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
036import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset;
037import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
038import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
039import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQOffsetImpl;
040
041import java.io.Externalizable;
042import java.io.File;
043import java.io.IOException;
044import java.nio.file.Files;
045import java.nio.file.Path;
046import java.time.Duration;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Comparator;
050import java.util.List;
051import java.util.Objects;
052import java.util.concurrent.ConcurrentLinkedQueue;
053import java.util.stream.Stream;
054
055/**
056 * Chronicle Queue implementation of MQAppender.
057 *
058 * Note that for performance reason the class loader assertion are disabled.
059 *
060 * @since 9.1
061 */
062public class ChronicleMQAppender<M extends Externalizable> implements MQAppender<M>, StoreFileListener {
063    private static final Log log = LogFactory.getLog(ChronicleMQAppender.class);
064    private static final String QUEUE_PREFIX = "Q-";
065    private static final int POLL_INTERVAL_MS = 100;
066
067    private static final String SECOND_ROLLING_PERIOD = "s";
068
069    private static final String MINUTE_ROLLING_PERIOD = "m";
070
071    private static final String HOUR_ROLLING_PERIOD = "h";
072
073    private static final String DAY_ROLLING_PERIOD = "d";
074
075    private final List<ChronicleQueue> queues;
076    private final int nbQueues;
077    private final File basePath;
078    private final String name;
079
080    private int retentionNbCycles;
081
082    // keep track of created tailers to make sure they are closed before the mq
083    private final ConcurrentLinkedQueue<ChronicleMQTailer<M>> tailers = new ConcurrentLinkedQueue<>();
084    private boolean closed = false;
085
086    static public boolean exists(File basePath) {
087        return basePath.isDirectory() && basePath.list().length > 0;
088    }
089
090    public String getBasePath() {
091        return basePath.getPath();
092    }
093
094    /**
095     * Create a new mqueues
096     */
097    static public <M extends Externalizable> ChronicleMQAppender<M> create(File basePath, int size,
098            String retentionPolicy) {
099        return new ChronicleMQAppender<>(basePath, size, retentionPolicy);
100    }
101
102    /**
103     * Create a new mqueues.
104     */
105    static public <M extends Externalizable> ChronicleMQAppender<M> create(File basePath, int size) {
106        return new ChronicleMQAppender<>(basePath, size, DEFAULT_RETENTION_DURATION);
107    }
108
109    /**
110     * Open an existing mqueues.
111     */
112    static public <M extends Externalizable> ChronicleMQAppender<M> open(File basePath) {
113        return new ChronicleMQAppender<>(basePath, 0, DEFAULT_RETENTION_DURATION);
114    }
115
116    /**
117     * Open an existing mqueues.
118     */
119    static public <M extends Externalizable> ChronicleMQAppender<M> open(File basePath, String retentionDuration) {
120        return new ChronicleMQAppender<>(basePath, 0, retentionDuration);
121    }
122
123    @Override
124    public String name() {
125        return name;
126    }
127
128    @Override
129    public int size() {
130        return nbQueues;
131    }
132
133    @Override
134    public MQOffset append(int partition, M message) {
135        ExcerptAppender appender = queues.get(partition).acquireAppender();
136        appender.writeDocument(w -> w.write("msg").object(message));
137        long offset = appender.lastIndexAppended();
138        MQOffset ret = new MQOffsetImpl(name, partition, offset);
139        if (log.isDebugEnabled()) {
140            log.debug(String.format("append to %s, value: %s", ret, message));
141        }
142        return ret;
143    }
144
145    public MQTailer<M> createTailer(MQPartition partition, String group) {
146        return addTailer(new ChronicleMQTailer<>(basePath.toString(),
147                queues.get(partition.partition()).createTailer(), partition, group));
148    }
149
150    private MQTailer<M> addTailer(ChronicleMQTailer<M> tailer) {
151        tailers.add(tailer);
152        return tailer;
153    }
154
155    @Override
156    public boolean waitFor(MQOffset offset, String group, Duration timeout) throws InterruptedException {
157        boolean ret;
158        long offsetPosition = offset.offset();
159        int partition = offset.partition().partition();
160        try (ChronicleMQOffsetTracker offsetTracker = new ChronicleMQOffsetTracker(basePath.toString(), partition, group)) {
161            ret = isProcessed(offsetTracker, offsetPosition);
162            if (ret) {
163                return true;
164            }
165            final long timeoutMs = timeout.toMillis();
166            final long deadline = System.currentTimeMillis() + timeoutMs;
167            final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
168            while (!ret && System.currentTimeMillis() < deadline) {
169                Thread.sleep(delay);
170                ret = isProcessed(offsetTracker, offsetPosition);
171            }
172        }
173        return ret;
174    }
175
176    @Override
177    public boolean closed() {
178        return closed;
179    }
180
181    private boolean isProcessed(ChronicleMQOffsetTracker tracker, long offset) {
182        long last = tracker.readLastCommittedOffset();
183        return (last > 0) && (last >= offset);
184    }
185
186
187    @Override
188    public void close() throws Exception {
189        log.debug("Closing queue");
190        tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
191            try {
192                tailer.close();
193            } catch (Exception e) {
194                log.error("Failed to close tailer: " + tailer);
195            }
196        });
197        tailers.clear();
198        queues.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close);
199        queues.clear();
200        closed = true;
201    }
202
203    private ChronicleMQAppender(File basePath, int size, String retentionDuration) {
204        if (size == 0) {
205            // open
206            if (!exists(basePath)) {
207                String msg = "Can not open Chronicle Queues, invalid path: " + basePath;
208                log.error(msg);
209                throw new IllegalArgumentException(msg);
210            }
211            this.nbQueues = findNbQueues(basePath);
212        } else {
213            // creation
214            if (exists(basePath)) {
215                String msg = "Can not create Chronicle Queues, already exists: " + basePath;
216                log.error(msg);
217                throw new IllegalArgumentException(msg);
218            }
219            if (!basePath.exists() && !basePath.mkdirs()) {
220                String msg = "Can not create Chronicle Queues in: " + basePath;
221                log.error(msg);
222                throw new IllegalArgumentException(msg);
223            }
224            this.nbQueues = size;
225        }
226        this.name = basePath.getName();
227        this.basePath = basePath;
228
229        if (retentionDuration != null) {
230            retentionNbCycles = Integer.valueOf(retentionDuration.substring(0, retentionDuration.length() - 1));
231        }
232
233        RollCycle rollCycle = getRollCycle(retentionDuration);
234
235        queues = new ArrayList<>(this.nbQueues);
236        log.debug(String.format("%s chronicle mqueue: %s, path: %s, size: %d",
237                (size == 0) ? "Opening" : "Creating", name, basePath, nbQueues));
238
239        for (int i = 0; i < nbQueues; i++) {
240            File path = new File(basePath, String.format("%s%02d", QUEUE_PREFIX, i));
241            ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path)
242                                                              .rollCycle(rollCycle)
243                                                              .storeFileListener(this)
244                                                              .build();
245            queues.add(queue);
246            // touch the queue so we can count them even if they stay empty.
247            queue.file().mkdirs();
248        }
249
250        // When manipulating millions of messages java assert must be disable or GC on Chronicle Queues will knock at the door
251        // also this does not work when running test suite, it requires to change the maven-surefire-plugin conf to add a -da option
252        ClassLoader loader = ClassLoader.getSystemClassLoader();
253        loader.setDefaultAssertionStatus(false);
254    }
255
256    private int findNbQueues(File basePath) {
257        int ret;
258        try (Stream<Path> paths = Files.list(basePath.toPath())) {
259            ret = (int) paths.filter(path -> (Files.isDirectory(path) && path.getFileName().toString().startsWith(QUEUE_PREFIX))).count();
260            if (ret == 0) {
261                throw new IOException("No chronicles queues file found");
262            }
263        } catch (IOException e) {
264            throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e);
265        }
266        return ret;
267    }
268
269    private RollCycle getRollCycle(String retentionDuration) {
270        String rollingPeriod = retentionDuration.substring(retentionDuration.length() - 1);
271        RollCycle rollCycle;
272        switch (rollingPeriod) {
273        case SECOND_ROLLING_PERIOD:
274            rollCycle = RollCycles.TEST_SECONDLY;
275            break;
276        case MINUTE_ROLLING_PERIOD:
277            rollCycle = RollCycles.MINUTELY;
278            break;
279        case HOUR_ROLLING_PERIOD:
280            rollCycle = RollCycles.HOURLY;
281            break;
282        case DAY_ROLLING_PERIOD:
283            rollCycle = RollCycles.DAILY;
284            break;
285        default:
286            String msg = "Unknown rolling period: " + rollingPeriod + " for MQueue: " + name();
287            log.error(msg);
288            throw new IllegalArgumentException(msg);
289        }
290        return rollCycle;
291    }
292
293    private int findQueueIndex(File queueFile) {
294        String queueDirName = queueFile.getParentFile().getName();
295        return Integer.valueOf(queueDirName.substring(queueDirName.length() - 2));
296    }
297
298    @Override
299    public void onAcquired(int cycle, File file) {
300        if (log.isDebugEnabled()) {
301            log.debug("New file created: " + file + " on cycle: " + cycle);
302        }
303
304        SingleChronicleQueue queue = (SingleChronicleQueue) queues.get(findQueueIndex(file));
305
306        int lowerCycle = queue.firstCycle();
307        int upperCycle = cycle - retentionNbCycles;
308
309        purgeQueue(lowerCycle, upperCycle, queue);
310
311    }
312
313    /**
314     * Files in queue older than the current date minus the retention duration are candidates for purging, knowing that
315     * the more recent files should be kept to ensure no data loss (for example after an interruption longer than the
316     * retention duration).
317     */
318    protected void purgeQueue(int lowerCycle, int upperCycle, SingleChronicleQueue queue) {
319        // TODO this method should be refactored after chronicle-queue lib upgrade
320        File[] files = queue.file().listFiles();
321
322        if (files != null && lowerCycle < upperCycle) {
323            RollingResourcesCache cache = new RollingResourcesCache(queue.rollCycle(), queue.epoch(),
324                    name -> new File(queue.file().getAbsolutePath(), name + SUFFIX),
325                    f -> FilenameUtils.removeExtension(f.getName()));
326
327            Arrays.stream(files)
328                  .sorted(Comparator.comparingLong(cache::toLong)) // Order files by cycles
329                  .limit(files.length - retentionNbCycles) // Keep the 'retentionNbCycles' more recent files
330                  .filter(f -> cache.parseCount(FilenameUtils.removeExtension(f.getName())) < upperCycle)
331                  .forEach(f -> {
332                      if (f.delete()) {
333                          log.info("Queue file deleted: " + f.getAbsolutePath());
334                      }
335                  });
336        }
337    }
338
339    @Override
340    public void onReleased(int cycle, File file) {
341
342    }
343}