001package org.nuxeo.ecm.platform.importer.queue.manager;/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * bdelbosc 016 */ 017 018 019import net.openhft.chronicle.queue.ChronicleQueue; 020import net.openhft.chronicle.queue.ExcerptAppender; 021import net.openhft.chronicle.queue.ExcerptTailer; 022import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; 023import org.jetbrains.annotations.Nullable; 024import org.nuxeo.common.utils.ExceptionUtils; 025import org.nuxeo.ecm.platform.importer.log.ImporterLogger; 026import org.nuxeo.ecm.platform.importer.source.SourceNode; 027 028import java.io.File; 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.Objects; 033import java.util.concurrent.TimeUnit; 034 035import static org.apache.commons.io.FileUtils.deleteDirectory; 036 037/** 038 * @since 8.10 039 */ 040public class CQManager extends AbstractQueuesManager { 041 042 final List<ChronicleQueue> queues; 043 final List<ExcerptAppender> appenders; 044 final List<ExcerptTailer> tailers; 045 046 public CQManager(ImporterLogger logger, int queuesNb) { 047 this(logger, queuesNb, false); 048 } 049 050 public CQManager(ImporterLogger logger, int queuesNb, boolean append) { 051 this(new File(System.getProperty("java.io.tmpdir"), "CQ"), logger, queuesNb, append); 052 } 053 054 public CQManager(File basePath, ImporterLogger logger, int queuesNb, boolean append) { 055 super(logger, queuesNb); 056 queues = new ArrayList<>(queuesNb); 057 appenders = new ArrayList<>(queuesNb); 058 tailers = new ArrayList<>(queuesNb); 059 if (!append) { 060 try { 061 logger.info("Clearing previous queues in: " + basePath); 062 deleteDirectory(basePath); 063 } catch (IOException e) { 064 log.error(e.getMessage(), e); 065 } 066 } 067 logger.info("Using chronicle queues in: " + basePath); 068 basePath.mkdirs(); 069 070 for (int i = 0; i < queuesNb; i++) { 071 File path = new File(basePath, "Q" + i); 072 ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path).build(); 073 appenders.add(queue.acquireAppender()); 074 tailers.add(queue.createTailer().toEnd()); 075 } 076 } 077 078 @Override 079 public void put(int queue, SourceNode node) throws InterruptedException { 080 appenders.get(queue).writeDocument(w -> w.write("node").object(node)); 081 } 082 083 @Override 084 public SourceNode poll(int queue) { 085 try { 086 return poll(queue, 5, TimeUnit.MINUTES); 087 } catch (InterruptedException e) { 088 log.error("poll timeout", e); 089 ExceptionUtils.checkInterrupt(e); 090 } 091 return null; 092 } 093 094 @Nullable 095 private SourceNode get(int queue) { 096 final SourceNode[] ret = new SourceNode[1]; 097 if (tailers.get(queue).readDocument(w -> { 098 ret[0] = (SourceNode) w.read("node").object(); 099 })) { 100 return ret[0]; 101 } 102 return null; 103 } 104 105 @Override 106 public SourceNode poll(int queue, long timeout, TimeUnit unit) throws InterruptedException { 107 SourceNode ret = get(queue); 108 if (ret != null) { 109 return ret; 110 } 111 final long deadline = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, unit); 112 while (ret == null && System.currentTimeMillis() < deadline) { 113 Thread.sleep(100); 114 ret = get(queue); 115 } 116 return ret; 117 } 118 119 @Override 120 public boolean isEmpty(int queue) { 121 return !tailers.get(queue).readingDocument().isPresent(); 122 } 123 124 @Override 125 public int size(int queue) { 126 return 0; 127 } 128 129 @Override 130 public void close() { 131 queues.stream().filter(Objects::nonNull).forEach(ChronicleQueue::close); 132 appenders.clear(); 133 tailers.clear(); 134 queues.clear(); 135 } 136}