001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.pattern.consumer; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.util.Collections; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.Callable; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.lib.stream.codec.Codec; 031import org.nuxeo.lib.stream.log.LogManager; 032import org.nuxeo.lib.stream.log.LogPartition; 033import org.nuxeo.lib.stream.log.Name; 034import org.nuxeo.lib.stream.log.kafka.KafkaUtils; 035import org.nuxeo.lib.stream.pattern.Message; 036import org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool; 037import org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner; 038 039/** 040 * Run a pool of ConsumerRunner. 041 * 042 * @since 9.1 043 */ 044public class ConsumerPool<M extends Message> extends AbstractCallablePool<ConsumerStatus> { 045 private static final Log log = LogFactory.getLog(ConsumerPool.class); 046 047 protected final String logName; 048 049 protected final LogManager manager; 050 051 protected final Codec<M> codec; 052 053 protected final ConsumerFactory<M> factory; 054 055 protected final ConsumerPolicy policy; 056 057 protected final List<List<LogPartition>> defaultAssignments; 058 059 /** 060 * @deprecated since 11.1, due to serialization issue with java 11, use 061 * {@link #ConsumerPool(String, LogManager, Codec, ConsumerFactory, ConsumerPolicy)} which allows to 062 * give a {@link org.nuxeo.lib.stream.codec.Codec codec} to {@link org.nuxeo.lib.stream.log.LogTailer 063 * tailer}. 064 */ 065 @Deprecated 066 @SuppressWarnings("unchecked") 067 public ConsumerPool(String logName, LogManager manager, ConsumerFactory<M> factory, ConsumerPolicy policy) { 068 this(logName, manager, NO_CODEC, factory, policy); 069 } 070 071 public ConsumerPool(String logName, LogManager manager, Codec<M> codec, ConsumerFactory<M> factory, 072 ConsumerPolicy policy) { 073 super(computeNbThreads((short) manager.getAppender(Name.ofUrn(logName)).size(), policy.getMaxThreads())); 074 this.logName = logName; 075 this.manager = manager; 076 this.codec = codec; 077 this.factory = factory; 078 this.policy = policy; 079 this.defaultAssignments = getDefaultAssignments(); 080 if (manager.supportSubscribe()) { 081 log.info("Creating consumer pool using Log subscribe on " + logName); 082 } else { 083 log.info("Creating consumer pool using Log assignments on " + logName + ": " + defaultAssignments); 084 } 085 } 086 087 protected static short computeNbThreads(short maxConcurrency, short maxThreads) { 088 if (maxThreads > 0) { 089 return (short) Math.min(maxConcurrency, maxThreads); 090 } 091 return maxConcurrency; 092 } 093 094 public Name getConsumerGroupName() { 095 return Name.ofUrn(policy.getName()); 096 } 097 098 @Override 099 protected ConsumerStatus getErrorStatus() { 100 return new ConsumerStatus("error", 0, 0, 0, 0, 0, 0, true); 101 } 102 103 @Override 104 protected Callable<ConsumerStatus> getCallable(int i) { 105 return new ConsumerRunner<>(factory, policy, manager, codec, defaultAssignments.get(i)); 106 } 107 108 @Override 109 protected String getThreadPrefix() { 110 return "Nuxeo-Consumer"; 111 } 112 113 @Override 114 protected void afterCall(List<ConsumerStatus> ret) { 115 ret.forEach(log::info); 116 log.warn(ConsumerStatus.toString(ret)); 117 } 118 119 protected List<List<LogPartition>> getDefaultAssignments() { 120 Map<String, Integer> streams = Collections.singletonMap(logName, 121 manager.getAppender(Name.ofUrn(logName)).size()); 122 return KafkaUtils.roundRobinAssignments(getNbThreads(), streams); 123 } 124 125}