001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaUtils;
026import org.nuxeo.ecm.platform.importer.mqueues.pattern.Message;
027import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals.AbstractCallablePool;
028import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals.ConsumerRunner;
029
030import java.util.Collections;
031import java.util.List;
032import java.util.Map;
033import java.util.concurrent.Callable;
034
035/**
036 * Run a pool of ConsumerRunner.
037 *
038 * @since 9.1
039 */
040public class ConsumerPool<M extends Message> extends AbstractCallablePool<ConsumerStatus> {
041    private static final Log log = LogFactory.getLog(ConsumerPool.class);
042    private final MQManager<M> manager;
043    private final ConsumerFactory<M> factory;
044    private final ConsumerPolicy policy;
045    private final String mqName;
046    private final List<List<MQPartition>> defaultAssignments;
047
048    public ConsumerPool(String mqName, MQManager<M> manager, ConsumerFactory<M> factory, ConsumerPolicy policy) {
049        super(computeNbThreads((short) manager.getAppender(mqName).size(), policy.getMaxThreads()));
050        this.mqName = mqName;
051        this.manager = manager;
052        this.factory = factory;
053        this.policy = policy;
054        this.defaultAssignments = getDefaultAssignments();
055        if (manager.supportSubscribe()) {
056            log.info("Creating consumer pool using MQ subscribe on " + mqName);
057        } else {
058            log.info("Creating consumer pool using MQ assignments on " + mqName + ": " + defaultAssignments);
059        }
060    }
061
062    protected static short computeNbThreads(short maxConcurrency, short maxThreads) {
063        if (maxThreads > 0) {
064            return (short) Math.min(maxConcurrency, maxThreads);
065        }
066        return maxConcurrency;
067    }
068
069    public String getConsumerGroupName() {
070        return policy.getName();
071    }
072
073    @Override
074    protected ConsumerStatus getErrorStatus() {
075        return new ConsumerStatus("error", 0, 0, 0, 0, 0, 0, true);
076    }
077
078    @Override
079    protected Callable<ConsumerStatus> getCallable(int i) {
080        return new ConsumerRunner<>(factory, policy, manager, defaultAssignments.get(i));
081    }
082
083    @Override
084    protected String getThreadPrefix() {
085        return "Nuxeo-Consumer";
086    }
087
088    @Override
089    protected void afterCall(List<ConsumerStatus> ret) {
090        ret.forEach(log::info);
091        log.warn(ConsumerStatus.toString(ret));
092    }
093
094    @Override
095    public void close() throws Exception {
096        super.close();
097    }
098
099    private List<List<MQPartition>> getDefaultAssignments() {
100        Map<String, Integer> streams = Collections.singletonMap(mqName, manager.getAppender(mqName).size());
101        return KafkaUtils.roundRobinAssignments(getNbThreads(), streams);
102    }
103
104}