001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern.consumer;
020
021import java.util.Collections;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.Callable;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.lib.stream.pattern.Message;
029import org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool;
030import org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner;
031import org.nuxeo.lib.stream.log.LogManager;
032import org.nuxeo.lib.stream.log.LogPartition;
033import org.nuxeo.lib.stream.log.kafka.KafkaUtils;
034
035/**
036 * Run a pool of ConsumerRunner.
037 *
038 * @since 9.1
039 */
040public class ConsumerPool<M extends Message> extends AbstractCallablePool<ConsumerStatus> {
041    private static final Log log = LogFactory.getLog(ConsumerPool.class);
042
043    protected final LogManager manager;
044
045    protected final ConsumerFactory<M> factory;
046
047    protected final ConsumerPolicy policy;
048
049    protected final String logName;
050
051    protected final List<List<LogPartition>> defaultAssignments;
052
053    public ConsumerPool(String logName, LogManager manager, ConsumerFactory<M> factory, ConsumerPolicy policy) {
054        super(computeNbThreads((short) manager.getAppender(logName).size(), policy.getMaxThreads()));
055        this.logName = logName;
056        this.manager = manager;
057        this.factory = factory;
058        this.policy = policy;
059        this.defaultAssignments = getDefaultAssignments();
060        if (manager.supportSubscribe()) {
061            log.info("Creating consumer pool using Log subscribe on " + logName);
062        } else {
063            log.info("Creating consumer pool using Log assignments on " + logName + ": " + defaultAssignments);
064        }
065    }
066
067    protected static short computeNbThreads(short maxConcurrency, short maxThreads) {
068        if (maxThreads > 0) {
069            return (short) Math.min(maxConcurrency, maxThreads);
070        }
071        return maxConcurrency;
072    }
073
074    public String getConsumerGroupName() {
075        return policy.getName();
076    }
077
078    @Override
079    protected ConsumerStatus getErrorStatus() {
080        return new ConsumerStatus("error", 0, 0, 0, 0, 0, 0, true);
081    }
082
083    @Override
084    protected Callable<ConsumerStatus> getCallable(int i) {
085        return new ConsumerRunner<>(factory, policy, manager, defaultAssignments.get(i));
086    }
087
088    @Override
089    protected String getThreadPrefix() {
090        return "Nuxeo-Consumer";
091    }
092
093    @Override
094    protected void afterCall(List<ConsumerStatus> ret) {
095        ret.forEach(log::info);
096        log.warn(ConsumerStatus.toString(ret));
097    }
098
099    protected List<List<LogPartition>> getDefaultAssignments() {
100        Map<String, Integer> streams = Collections.singletonMap(logName, manager.getAppender(logName).size());
101        return KafkaUtils.roundRobinAssignments(getNbThreads(), streams);
102    }
103
104}