001package org.nuxeo.ecm.platform.importer.queue.manager;/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     bdelbosc
016 */
017
018import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
019import org.nuxeo.ecm.platform.importer.source.SourceNode;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Objects;
025import java.util.concurrent.ArrayBlockingQueue;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.TimeUnit;
028
029/**
030 * Blocking Queues Manager, in memory queues
031 * @since 8.10
032 */
033public class BQManager extends AbstractQueuesManager {
034
035    final List<BlockingQueue<SourceNode>> queues;
036
037    protected final int maxQueueSize;
038
039    public BQManager(ImporterLogger logger, int queuesNb, int maxQueueSize) {
040        super(logger, queuesNb);
041        this.maxQueueSize = maxQueueSize;
042        queues = new ArrayList<>(queuesNb);
043        for (int i = 0; i < queuesNb; i++) {
044            queues.add(new ArrayBlockingQueue<>(maxQueueSize));
045        }
046    }
047
048    @Override
049    public boolean isEmpty(int queue) {
050        return queues.get(queue).isEmpty();
051    }
052
053    @Override
054    public SourceNode poll(int queue, long timeout, TimeUnit unit) throws InterruptedException {
055        return queues.get(queue).poll(timeout, unit);
056    }
057
058    @Override
059    public int size(int queue) {
060        return queues.get(queue).size();
061    }
062
063    @Override
064    public SourceNode poll(int queue) {
065        return queues.get(queue).poll();
066    }
067
068    @Override
069    public void put(int queue, SourceNode node) throws InterruptedException {
070        queues.get(queue).put(node);
071    }
072
073    @Override
074    public void close() {
075        queues.stream().filter(Objects::nonNull).forEach(BlockingQueue::clear);
076        queues.clear();
077    }
078}