背景
最近团队的小伙伴反馈了几个关于kafka消费者的问题
- 项目监听了几十个topic,启动时都需要进行初始化,很多时候本地调试时根本不需要启动消费者,但却被迫进行初始化,希望能优化这个问题。
- 由于我司在每个环境上,都区分了不同集群(如01、02、03等),来满足并行开发不同需求使用。但目前kafka并未根据不同集群做区分,导致partition消息无法被指定的某个集群消费,例如原本希望被集群01消费的消息,却被集群03消费了,所以希望能服务运行时动态控制不同集群消费者是否开启;
- kafka的topic调整partition数量后,消费者服务在不增加实例的前提下,想调整消费者服务的并发数只能手动修改代码后重新部署,非常麻烦,所以希望能在服务运行时动态调整消费者的并发数量,提高消费能力;
分析
我司项目都是基于SpringBoot,消费者使用@KafkaListener注解实现监听。
问题1
由于@KafkaListenr注解被KafkaListenerAnnotationBeanPostProcessor扫描,在postProcessAfterInitialization方法被解析后构造Consumer容器,且autoStartup属性默认为true,所以会自动启动。
要禁止自动启动Consumer容器比较简单,只要将Consumer容器设置成不自动启动即可,有以下解决方案:
- 在@KafkaListener注解指定autoStartup属性为false,该方法只针对使用注解的方法;
- 在@KafkaListener注解指定containerFactory属性,该属性值指向AbstractKafkaListenerContainerFactory实现类的实例,实例可以设置autoStartup属性为false;
在本地调试环境时可以将autoStartup属性设置为默认false不启动Consumer容器,但是在其他环境,还是需要启动Consumer容器的,所以就有以下两种方案
- 利用不同环境的配置文件,根据配置值来决定是否启动Consumer容器(@KafkaListener的autoStartup支持SpEL);
- 指定AbstractKafkaListenerContainerFactory的autoStartup为false,集成Apollo配置中心,并且在Spring容器完全启动成功后,再根据配置中心文件判断是否需要对Consumer容器进行start;
问题2
kafka通过Rebalance协议来规定一个Consumer Group下的所有Consumer如何达成一致,来分配订阅Topic的每个partition。
触发Rebalance的条件有3个:
- Group组员发生变化,例如有Consumer加入或离开Consumer Group;
- 订阅的Topic数量发生变化;
- 订阅的Topic的partition数量发生变化;
综上可以得知,我们只需让指定集群的Consumer在启动的时候,不加入Consumer Group(不启动Consumer),或者在已经加入Consumer Group前提下离开Consumer Group即可,从而触发Rebalance机制,最终达到只让指定集群的Consumer消费的目的。
至于想要在服务运行时进行动态调整,只需接入Apollo配置中心,不同集群使用不同的配置文件即可。
问题3
@KafkaListener注解上的concurrency属性,用来指定Consumer的并发数量。
目前项目想调整Consumer的并发数量,只能通过手动修改的代码后重新发布,其目的本质上来讲,是修改
AbstractKafkaListenerContainerFactory实例上的concurrency属性,然后重启AbstractKafkaListenerContainerFactory实例(其实就是调用stop()方法后再调用start()方法),最终在启动的时候,消费者服务会根据concurrency启动相应数量的消费者。
至于想要在服务运行时进行动态调整,只需配合Apoll配置中心实现即可。
实现
在问题1的两种方案中,由于方案2可以更灵活控制,所以此处直接使用方案2
@Bean(KAFKA_CONSUMER_FACTORY)
public KafkaListenerContainerFactory<?> kafkaConsumerContainerFactory() {
Map<String, Object> properties = new HashMap<>();
//...其他配置
DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(properties);
//支持并发的消费者容器工厂
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
//设置Consumer不自动启动
factory.setAutoStartup(false);
return factory;
}
//@KafkaListener的autoStartup属性,优先级高于containerFactory的配置,二选一即可
@KafkaListener(id = "listenerId-1", topics = "topic-1",
containerFactory = KAFKA_CONSUMER_FACTORY,
autoStartup = false)
public void listener1(String message) {
log.info("listener1 message:{}",message);
}
实现了Consumer容器禁止自动启动,那在哪个时间点让Consumer容器进行start呢,此处选择ApplicationRunner接口回调时间点。
@Slf4j
public class KafkaConsumerRefresh implements ApplicationContextAware, ApplicationRunner {
//定义apollo配置的key前缀
//最终apollo上的配置key为 kafka.refresh.consumer.${listenerId}
public static final String KAFKA_REFRESH_CONSUMER_PREFIXES = "kafka.refresh.consumer.";
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//为了在Spring容器启动后,还能获取到Consumer容器,需要注入Spring容器。
//此处选择实现ApplicationContextAware接口
this.applicationContext = applicationContext;
}
//ApplicationRunner回调方法
@Override
public void run(ApplicationArguments args) throws Exception {
//使用@KafkaListener注解构成的Consumer,最终会被注册到KafkaListenerEndpointRegistry实例对象中,
// 详见org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization
KafkaListenerEndpointRegistry endpointRegistry = this.applicationContext.getBean(KafkaListenerEndpointRegistry.class);
//获取所有的Consumer容器
for (MessageListenerContainer listenerContainer : endpointRegistry.getListenerContainers()) {
//获取指定listenerId对应配置值,表示是否需要启动Consumer容器,默认不启动
Boolean needStart = ConfigService.getAppConfig().getBooleanProperty(KAFKA_REFRESH_CONSUMER_PREFIXES + listenerContainer.getListenerId(), false);
if (needStart){
//启动容器中的Consumer
listenerContainer.start();
}
}
}
}
为了实现服务运行时可以感知Apollo配置文件变更,从而动态调整程序,需要实现监听Apollo,代码如下
@Slf4j
public class KafkaConsumerRefresh implements ApplicationContextAware, ApplicationRunner, ConfigChangeListener {
//定义apollo配置的key前缀
//最终apollo上的配置key为 kafka.refresh.consumer.${listenerId}
public static final String KAFKA_REFRESH_CONSUMER_PREFIXES = "kafka.refresh.consumer.";
private ApplicationContext applicationContext;
@PostConstruct
public void init(){
//注入Apollo监听指定前缀的配置
ConfigService.getAppConfig().addChangeListener(this,null, Sets.newHashSet(KAFKA_REFRESH_CONSUMER_PREFIXES));
}
//apollo配置变更回调
@Override
public void onChange(ConfigChangeEvent configChangeEvent) {
Set<String> changedKeys = configChangeEvent.changedKeys();
for (String changedKey : changedKeys) {
String key = ConfigService.getAppConfig().getProperty(changedKey, "");
String listenerId = getListenerIdByPropertiesName(key);
log.info("变更配置 key:{},listenerId:{}",key,listenerId);
}
}
///...其他方法略
}
由于需要满足的对Consumer容器启用、停用、调整并发,所以需要Apollo上配置对应的值,我们使用json来定义。
@Data
public class KafkaRefreshProperties {
//listsnerId
private String listenerId;
//consumer容器是否启用
private boolean start;
//consumer是否暂停
private boolean pause;
//consumer容器并发数量
private Integer concurrency;
//是否在应用启动完成后再启动consumer
private boolean afterAppRunnerStart;
}
关于Consumer容器与Consumer的区别,可以简单看成一对多的关系,即容器里面维护多个Consumer,从而形成Consumer并发。当Consumer容器进行stop操作时,所维护的多个Consumer都会退出Consumer Group,从而触发Rebalance。Consumer也可以进行pause和resume操作,pause时,Consumer将暂停消费partition消息,但是不会退出Consumer Group,所以不会触发Rebalance,此时可以用resume来恢复Consumer消费。
有了动态配置的数据结构,下面就可以实现对Consumer容器的动态调整了
@Slf4j
public class KafkaConsumerRefresh implements ConfigChangeListener, ApplicationContextAware, ApplicationRunner, ApplicationListener<KafkaEvent> {
//定义apollo配置的key前缀
//最终apollo上的配置key为 kafka.refresh.consumer.${listenerId}
public static final String KAFKA_REFRESH_CONSUMER_PREFIXES = "kafka.refresh.consumer.";
private ApplicationContext applicationContext;
@PostConstruct
public void init(){
//Apollo注入指定前缀监听
ConfigService.getAppConfig().addChangeListener(this,null, Sets.newHashSet(KAFKA_REFRESH_CONSUMER_PREFIXES));
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//为了在Spring容器启动后,还能获取到Consumer容器,需要注入Spring上下文容器。
//此处选择实现ApplicationContextAware接口
this.applicationContext = applicationContext;
}
@Override
public void onChange(ConfigChangeEvent configChangeEvent) {
Set<String> changedKeys = configChangeEvent.changedKeys();
log.info("changedKeys:{}",changedKeys);
//删除操作忽略,新增、修改操作需要变动
changedKeys.stream().filter(key-> configChangeEvent.getChange(key).getChangeType() != PropertyChangeType.DELETED).map(this::getRefreshPropertiesByPropertiesName).forEach(this::doChange);
}
//配置文件变更,执行操作
private void doChange(KafkaRefreshProperties refreshProperties){
if (!refreshProperties.isStart()){
this.stop(refreshProperties);
}else {
this.start(refreshProperties);
}
}
//停止Consumer容器
public void stop(KafkaRefreshProperties refreshProperties){
List<MessageListenerContainer> listenerContainers = load(refreshProperties.getListenerId());
if (listenerContainers == null || listenerContainers.size() == 0){
return;
}
for (MessageListenerContainer container : listenerContainers) {
container.stop();
}
}
//启动Consumer容器
public void start(KafkaRefreshProperties refreshProperties){
List<MessageListenerContainer> listenerContainers = load(refreshProperties.getListenerId());
if (listenerContainers == null || listenerContainers.size() == 0){
return;
}
for (MessageListenerContainer container : listenerContainers) {
//调整并发
if (container instanceof ConcurrentMessageListenerContainer<?, ?> && refreshProperties.getConcurrency() != null) {
ConcurrentMessageListenerContainer concurrentListenerContainer = (ConcurrentMessageListenerContainer) container;
if (concurrentListenerContainer.getConcurrency() != refreshProperties.getConcurrency()) {
//调整并发
concurrentListenerContainer.setConcurrency(refreshProperties.getConcurrency());
// 如果当前状态为运行中,需要先stop再start
concurrentListenerContainer.stop();
}
}
//恢复
if (container.isPauseRequested() && !refreshProperties.isPause()){
container.resume();
}
//暂停
if (!container.isPauseRequested() && refreshProperties.isPause()){
container.pause();
}
//执行启动操作
container.start();
}
}
//propertiesName解析成listenerId
public String getListenerIdByPropertiesName(String propertiesName){
return propertiesName.substring(KAFKA_REFRESH_CONSUMER_PREFIXES.length());
}
//propertiesName获取值,并解析成KafkaRefreshProperties配置对象
public KafkaRefreshProperties getRefreshPropertiesByPropertiesName(String propertiesName){
String listenerId = this.getListenerIdByPropertiesName(propertiesName);
KafkaRefreshProperties property = ConfigService.getAppConfig().getProperty(propertiesName, value -> JSON.parseObject(value, KafkaRefreshProperties.class), null);
if (property != null){
property.setListenerId(listenerId);
}
return property;
}
//根据listenerId获取Consumer容器
public List<MessageListenerContainer> load(String listenerId){
//使用@KafkaListener注解构成的Consumer,最终会被注册到KafkaListenerEndpointRegistry实例对象中,
// 详见org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization
KafkaListenerEndpointRegistry endpointRegistry = this.applicationContext.getBean(KafkaListenerEndpointRegistry.class);
return endpointRegistry.getListenerContainers().stream().filter(f-> Objects.equals(f.getListenerId(), listenerId)).collect(Collectors.toList());
}
//获取存在的动态配置
private Set<KafkaRefreshProperties> getExistsProperties() {
return ConfigService.getAppConfig().getPropertyNames().stream().filter(f -> f.startsWith(KAFKA_REFRESH_CONSUMER_PREFIXES))
.map(this::getRefreshPropertiesByPropertiesName).filter(Objects::nonNull).collect(Collectors.toSet());
}
//ApplicationRunner回调方法
@Override
public void run(ApplicationArguments args) throws Exception {
//筛选需要在应用容器启动后自动启动的Consumer容器,并执行启动操作
getExistsProperties().stream().filter(KafkaRefreshProperties::isAfterAppRunnerStart)
.filter(KafkaRefreshProperties::isStart)
.forEach(this::start);
}
//kafka事件变更监听
@Override
public void onApplicationEvent(KafkaEvent event) {
log.info("KafkaEvent:{}",event);
}
}
来到这里,核心代码基本就完成了。
除此之外,为了方便观察,还可以实现了ApplicationListener接口,并监听KafkaEvent,可以观察到参数调整对Kafka消费者的影响。
接下来就只需要将KafkaConsumerRefresh实例注入到Spring容器
@Slf4j
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumerRefresh kafkaConsumerRefresh(){
return new KafkaConsumerRefresh();
}
}
然后在Apollo配置中心配置以下内容
kafka.refresh.consumer.listenerId-1 = {\n "start": true,\n "pause": false,\n "concurrency": 2,\n "afterAppRunnerStart":true\n}
该配置表示listenerId-1的Consumer容器,需要启动,并且在应用容器启动后再启动,并且Consumer并发数为2。
后续如果需要调整,只需要修改json中对应字段的值,再发版,即可动态调整kafka消费者相关参数。
总结
本按理只是简单对Consumer常见的参数今天调整,按理来说,支持对Consumer更多参数进行拓展调整,期待各位大佬的实现;
同时也可以举一反三,除了Kafka可以使用这种操作不停服调整消费者参数,那么RabbitMQ,或者其他类似中间件的消费者,在基于Spring或SpringBoot的基础上,是否也可以这样操作呢?