001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager; 024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaUtils; 026import org.nuxeo.ecm.platform.importer.mqueues.pattern.Message; 027import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals.AbstractCallablePool; 028import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals.ConsumerRunner; 029 030import java.util.Collections; 031import java.util.List; 032import java.util.Map; 033import java.util.concurrent.Callable; 034 035/** 036 * Run a pool of ConsumerRunner. 037 * 038 * @since 9.1 039 */ 040public class ConsumerPool<M extends Message> extends AbstractCallablePool<ConsumerStatus> { 041 private static final Log log = LogFactory.getLog(ConsumerPool.class); 042 private final MQManager<M> manager; 043 private final ConsumerFactory<M> factory; 044 private final ConsumerPolicy policy; 045 private final String mqName; 046 private final List<List<MQPartition>> defaultAssignments; 047 048 public ConsumerPool(String mqName, MQManager<M> manager, ConsumerFactory<M> factory, ConsumerPolicy policy) { 049 super(computeNbThreads((short) manager.getAppender(mqName).size(), policy.getMaxThreads())); 050 this.mqName = mqName; 051 this.manager = manager; 052 this.factory = factory; 053 this.policy = policy; 054 this.defaultAssignments = getDefaultAssignments(); 055 if (manager.supportSubscribe()) { 056 log.info("Creating consumer pool using MQ subscribe on " + mqName); 057 } else { 058 log.info("Creating consumer pool using MQ assignments on " + mqName + ": " + defaultAssignments); 059 } 060 } 061 062 protected static short computeNbThreads(short maxConcurrency, short maxThreads) { 063 if (maxThreads > 0) { 064 return (short) Math.min(maxConcurrency, maxThreads); 065 } 066 return maxConcurrency; 067 } 068 069 public String getConsumerGroupName() { 070 return policy.getName(); 071 } 072 073 @Override 074 protected ConsumerStatus getErrorStatus() { 075 return new ConsumerStatus("error", 0, 0, 0, 0, 0, 0, true); 076 } 077 078 @Override 079 protected Callable<ConsumerStatus> getCallable(int i) { 080 return new ConsumerRunner<>(factory, policy, manager, defaultAssignments.get(i)); 081 } 082 083 @Override 084 protected String getThreadPrefix() { 085 return "Nuxeo-Consumer"; 086 } 087 088 @Override 089 protected void afterCall(List<ConsumerStatus> ret) { 090 ret.forEach(log::info); 091 log.warn(ConsumerStatus.toString(ret)); 092 } 093 094 @Override 095 public void close() throws Exception { 096 super.close(); 097 } 098 099 private List<List<MQPartition>> getDefaultAssignments() { 100 Map<String, Integer> streams = Collections.singletonMap(mqName, manager.getAppender(mqName).size()); 101 return KafkaUtils.roundRobinAssignments(getNbThreads(), streams); 102 } 103 104}