001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.consumer;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues;
025
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Objects;
029import java.util.concurrent.Callable;
030
031/**
032 * Run a pool of ConsumerRunner.
033 *
034 * @since 9.1
035 */
036public class ConsumerPool<M extends Message> extends AbstractCallablePool<ConsumerStatus> {
037    private static final Log log = LogFactory.getLog(ConsumerPool.class);
038    private final MQueues<M> qm;
039    private final ConsumerFactory<M> factory;
040    private final ConsumerPolicy policy;
041    private final List<MQueues.Tailer<M>> tailers;
042
043    public ConsumerPool(MQueues<M> qm, ConsumerFactory<M> factory, ConsumerPolicy policy) {
044        super(qm.size());
045        this.qm = qm;
046        this.factory = factory;
047        this.policy = policy;
048        this.tailers = new ArrayList<>(qm.size());
049    }
050
051    @Override
052    protected ConsumerStatus getErrorStatus() {
053        return new ConsumerStatus(0, 0, 0, 0, 0, 0, 0, true);
054    }
055
056    @Override
057    protected Callable<ConsumerStatus> getCallable(int i) {
058        MQueues.Tailer<M> tailer = qm.createTailer(i);
059        tailers.add(tailer);
060        return new ConsumerRunner<>(factory, policy, tailer);
061    }
062
063    @Override
064    protected String getThreadPrefix() {
065        return "Nuxeo-Consumer";
066    }
067
068    @Override
069    protected void afterCall(List<ConsumerStatus> ret) {
070        closeTailers();
071        ret.forEach(log::info);
072        log.warn(ConsumerStatus.toString(ret));
073    }
074
075    private void closeTailers() {
076        tailers.stream().filter(Objects::nonNull).forEach(tailer -> {
077            try {
078                tailer.close();
079            } catch (Exception e) {
080                log.error("Unable to close tailer: " + tailer.getQueue());
081            }
082        });
083    }
084
085    @Override
086    public void close() throws Exception {
087        super.close();
088        closeTailers();
089    }
090}