001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues;
020
021import net.openhft.chronicle.queue.ChronicleQueue;
022import net.openhft.chronicle.queue.ExcerptAppender;
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
026
027import java.io.File;
028import java.io.IOException;
029import java.nio.file.Files;
030import java.nio.file.Path;
031import java.time.Duration;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Objects;
035import java.util.concurrent.ConcurrentLinkedQueue;
036import java.util.stream.Stream;
037
038import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder.binary;
039import static org.apache.commons.io.FileUtils.deleteDirectory;
040import static org.nuxeo.ecm.platform.importer.mqueues.mqueues.CQTailer.DEFAULT_OFFSET_NAMESPACE;
041
042/**
043 * Chronicle Queue implementation of MQueues.
044 *
045 * Note that for performance reason the class loader assertion are disabled.
046 *
047 * @since 9.1
048 */
049public class CQMQueues<M extends Message> implements MQueues<M> {
050    private static final Log log = LogFactory.getLog(CQMQueues.class);
051    private static final String QUEUE_PREFIX = "Q-";
052    private static final int POLL_INTERVAL_MS = 100;
053
054    private final List<ChronicleQueue> queues;
055    private final int nbQueues;
056    private final File basePath;
057
058    // keep track of created tailers to make sure they are closed before the mq
059    private final ConcurrentLinkedQueue<CQTailer<M>> tailers = new ConcurrentLinkedQueue<>();
060
061    /**
062     * Create a new mqueues. Warning this will ERASE the basePath if not empty.
063     */
064    public CQMQueues(File basePath, int size) {
065        this(basePath, size, false);
066    }
067
068    /**
069     * Open an existing mqueues.
070     */
071    public CQMQueues(File basePath) {
072        this(basePath, 0, true);
073    }
074
075
076    @Override
077    public int size() {
078        return nbQueues;
079    }
080
081    @Override
082    public CQOffset append(int queue, M message) {
083        ExcerptAppender appender = queues.get(queue).acquireAppender();
084        appender.writeDocument(w -> w.write("msg").object(message));
085        return new CQOffset(queue, appender.lastIndexAppended());
086    }
087
088    @Override
089    public Tailer<M> createTailer(int queue) {
090        return addTailer(new CQTailer<>(basePath.toString(), queues.get(queue).createTailer(), queue));
091    }
092
093    @Override
094    public Tailer<M> createTailer(int queue, String name) {
095        return addTailer(new CQTailer<>(basePath.toString(), queues.get(queue).createTailer(), queue, name));
096    }
097
098    private Tailer<M> addTailer(CQTailer<M> tailer) {
099        tailers.add(tailer);
100        return tailer;
101    }
102
103    @Override
104    public boolean waitFor(Offset offset, Duration timeout) throws InterruptedException {
105        boolean ret;
106        long offsetPosition = ((CQOffset) offset).getOffset();
107        int queue = ((CQOffset) offset).getQueue();
108        try (CQOffsetTracker offsetTracker = new CQOffsetTracker(basePath.toString(), queue, DEFAULT_OFFSET_NAMESPACE)) {
109            ret = isProcessed(offsetTracker, offsetPosition);
110            if (ret) {
111                return true;
112            }
113            final long timeoutMs = timeout.toMillis();
114            final long deadline = System.currentTimeMillis() + timeoutMs;
115            final long delay = Math.min(POLL_INTERVAL_MS, timeoutMs);
116            while (!ret && System.currentTimeMillis() < deadline) {
117                Thread.sleep(delay);
118                ret = isProcessed(offsetTracker, offsetPosition);
119            }
120        }
121        return ret;
122    }
123
124    private boolean isProcessed(CQOffsetTracker tracker, long offset) {
125        long last = tracker.readLastCommittedOffset();
126        return (last > 0) && (last >= offset);
127    }
128
129
130    @Override
131    public void close() throws Exception {
132        log.debug("Closing queue");
133        tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
134            try {
135                tailer.close();
136            } catch (Exception e) {
137                log.error("Failed to close tailer: " + tailer);
138            }
139        });
140        tailers.clear();
141        queues.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close);
142        queues.clear();
143    }
144
145    private CQMQueues(File basePath, int size, boolean append) {
146        if (!append) {
147            resetBasePath(basePath);
148            this.nbQueues = size;
149        } else {
150            this.nbQueues = findNbQueues(basePath);
151        }
152        this.basePath = basePath;
153        queues = new ArrayList<>(this.nbQueues);
154        log.info("Using chronicle queues in: " + basePath);
155
156        for (int i = 0; i < this.nbQueues; i++) {
157            File path = new File(basePath, String.format("%s%02d", QUEUE_PREFIX, i));
158            ChronicleQueue queue = binary(path).build();
159            queues.add(queue);
160            // touch the queue so we can count them even if they stay empty.
161            queue.file().mkdirs();
162        }
163
164        // When manipulating millions of messages java assert must be disable or GC on Chronicle Queues will knock at the door
165        // also this does not work when running test suite, it requires to change the maven-surefire-plugin conf to add a -da option
166        ClassLoader loader = ClassLoader.getSystemClassLoader();
167        loader.setDefaultAssertionStatus(false);
168    }
169
170    private int findNbQueues(File basePath) {
171        int ret;
172        try (Stream<Path> paths = Files.list(basePath.toPath())) {
173            ret = (int) paths.filter(path -> (Files.isDirectory(path) && path.getFileName().toString().startsWith(QUEUE_PREFIX))).count();
174            if (ret == 0) {
175                throw new IOException("No chronicles queues file found");
176            }
177        } catch (IOException e) {
178            throw new IllegalArgumentException("Invalid basePath for queue: " + basePath, e);
179        }
180        return ret;
181    }
182
183    private void resetBasePath(File basePath) {
184        if (basePath.isDirectory()) {
185            deleteQueueBasePath(basePath);
186        }
187        if (!basePath.mkdirs()) {
188            String msg = "Can not create Chronicle Queues in: " + basePath;
189            log.error(msg);
190            throw new IllegalArgumentException(msg);
191        }
192    }
193
194    private void deleteQueueBasePath(File basePath) {
195        try {
196            log.info("Removing Chronicle Queues directory: " + basePath);
197            // Performs a recursive delete if the directory contains only chronicles files
198            try (Stream<Path> paths = Files.list(basePath.toPath())) {
199                int count = (int) paths.filter(path -> (Files.isRegularFile(path) && !path.toString().endsWith(".cq4"))).count();
200                if (count > 0) {
201                    String msg = "CQMQueues basePath: " + basePath + " contains unkown files, please choose another basePath";
202                    log.error(msg);
203                    throw new IllegalArgumentException(msg);
204                }
205            }
206            deleteDirectory(basePath);
207        } catch (IOException e) {
208            String msg = "Can not remove Chronicle Queues directory: " + basePath + " " + e.getMessage();
209            log.error(msg, e);
210            throw new IllegalArgumentException(msg, e);
211        }
212    }
213
214
215}