001package org.nuxeo.ecm.platform.importer.queue.manager;/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     bdelbosc
016 */
017
018
019import net.openhft.chronicle.queue.ChronicleQueue;
020import net.openhft.chronicle.queue.ExcerptAppender;
021import net.openhft.chronicle.queue.ExcerptTailer;
022import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
023import org.jetbrains.annotations.Nullable;
024import org.nuxeo.common.utils.ExceptionUtils;
025import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
026import org.nuxeo.ecm.platform.importer.source.SourceNode;
027
028import java.io.File;
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Objects;
033import java.util.concurrent.TimeUnit;
034
035import static org.apache.commons.io.FileUtils.deleteDirectory;
036
037/**
038 * @since 8.10
039 */
040public class CQManager extends AbstractQueuesManager {
041
042    final List<ChronicleQueue> queues;
043    final List<ExcerptAppender> appenders;
044    final List<ExcerptTailer> tailers;
045
046    public CQManager(ImporterLogger logger, int queuesNb) {
047        this(logger, queuesNb, false);
048    }
049
050    public CQManager(ImporterLogger logger, int queuesNb, boolean append) {
051        this(new File(System.getProperty("java.io.tmpdir"), "CQ"), logger, queuesNb, append);
052    }
053
054    public CQManager(File basePath, ImporterLogger logger, int queuesNb, boolean append) {
055        super(logger, queuesNb);
056        queues = new ArrayList<>(queuesNb);
057        appenders = new ArrayList<>(queuesNb);
058        tailers = new ArrayList<>(queuesNb);
059        if (!append) {
060            try {
061                logger.info("Clearing previous queues in: " + basePath);
062                deleteDirectory(basePath);
063            } catch (IOException e) {
064                log.error(e.getMessage(), e);
065            }
066        }
067        logger.info("Using chronicle queues in: " + basePath);
068        basePath.mkdirs();
069
070        for (int i = 0; i < queuesNb; i++) {
071            File path = new File(basePath, "Q" + i);
072            ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path).build();
073            appenders.add(queue.acquireAppender());
074            tailers.add(queue.createTailer().toEnd());
075        }
076    }
077
078    @Override
079    public void put(int queue, SourceNode node) throws InterruptedException {
080        appenders.get(queue).writeDocument(w -> w.write("node").object(node));
081    }
082
083    @Override
084    public SourceNode poll(int queue) {
085        try {
086            return poll(queue, 5, TimeUnit.MINUTES);
087        } catch (InterruptedException e) {
088            log.error("poll timeout", e);
089            ExceptionUtils.checkInterrupt(e);
090        }
091        return null;
092    }
093
094    @Nullable
095    private SourceNode get(int queue) {
096        final SourceNode[] ret = new SourceNode[1];
097        if (tailers.get(queue).readDocument(w -> {
098            ret[0] = (SourceNode) w.read("node").object();
099        })) {
100            return ret[0];
101        }
102        return null;
103    }
104
105    @Override
106    public SourceNode poll(int queue, long timeout, TimeUnit unit) throws InterruptedException {
107        SourceNode ret = get(queue);
108        if (ret != null) {
109            return ret;
110        }
111        final long deadline = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, unit);
112        while (ret == null && System.currentTimeMillis() < deadline) {
113            Thread.sleep(100);
114            ret = get(queue);
115        }
116        return ret;
117    }
118
119    @Override
120    public boolean isEmpty(int queue) {
121        return !tailers.get(queue).readingDocument().isPresent();
122    }
123
124    @Override
125    public int size(int queue) {
126        return 0;
127    }
128
129    @Override
130    public void close() {
131        queues.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close);
132        appenders.clear();
133        tailers.clear();
134        queues.clear();
135    }
136}