001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.pattern.consumer; 020 021import java.util.Collections; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.Callable; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.lib.stream.pattern.Message; 029import org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool; 030import org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner; 031import org.nuxeo.lib.stream.log.LogManager; 032import org.nuxeo.lib.stream.log.LogPartition; 033import org.nuxeo.lib.stream.log.kafka.KafkaUtils; 034 035/** 036 * Run a pool of ConsumerRunner. 037 * 038 * @since 9.1 039 */ 040public class ConsumerPool<M extends Message> extends AbstractCallablePool<ConsumerStatus> { 041 private static final Log log = LogFactory.getLog(ConsumerPool.class); 042 043 protected final LogManager manager; 044 045 protected final ConsumerFactory<M> factory; 046 047 protected final ConsumerPolicy policy; 048 049 protected final String logName; 050 051 protected final List<List<LogPartition>> defaultAssignments; 052 053 public ConsumerPool(String logName, LogManager manager, ConsumerFactory<M> factory, ConsumerPolicy policy) { 054 super(computeNbThreads((short) manager.getAppender(logName).size(), policy.getMaxThreads())); 055 this.logName = logName; 056 this.manager = manager; 057 this.factory = factory; 058 this.policy = policy; 059 this.defaultAssignments = getDefaultAssignments(); 060 if (manager.supportSubscribe()) { 061 log.info("Creating consumer pool using Log subscribe on " + logName); 062 } else { 063 log.info("Creating consumer pool using Log assignments on " + logName + ": " + defaultAssignments); 064 } 065 } 066 067 protected static short computeNbThreads(short maxConcurrency, short maxThreads) { 068 if (maxThreads > 0) { 069 return (short) Math.min(maxConcurrency, maxThreads); 070 } 071 return maxConcurrency; 072 } 073 074 public String getConsumerGroupName() { 075 return policy.getName(); 076 } 077 078 @Override 079 protected ConsumerStatus getErrorStatus() { 080 return new ConsumerStatus("error", 0, 0, 0, 0, 0, 0, true); 081 } 082 083 @Override 084 protected Callable<ConsumerStatus> getCallable(int i) { 085 return new ConsumerRunner<>(factory, policy, manager, defaultAssignments.get(i)); 086 } 087 088 @Override 089 protected String getThreadPrefix() { 090 return "Nuxeo-Consumer"; 091 } 092 093 @Override 094 protected void afterCall(List<ConsumerStatus> ret) { 095 ret.forEach(log::info); 096 log.warn(ConsumerStatus.toString(ret)); 097 } 098 099 protected List<List<LogPartition>> getDefaultAssignments() { 100 Map<String, Integer> streams = Collections.singletonMap(logName, manager.getAppender(logName).size()); 101 return KafkaUtils.roundRobinAssignments(getNbThreads(), streams); 102 } 103 104}