请选择 进入手机版 | 继续访问电脑版

[java] 办理 kafka消息堆积及分区不匀称 的标题

[复制链接]
查看119 | 回复5 | 2021-9-14 08:04:08 | 显示全部楼层 |阅读模式
目次

kafka消息堆积及分区不匀称 的办理

我在环境中发当代 码内里 的kafka有所耽误 ,查看kafka消息发现堆积严峻 ,颠末 检查发现是kafka消息分区不匀称 造成的,消耗 速率 过慢。这里由本身 在假造 机上演示相干 题目 ,给大家提供相应题目 的参考思绪 。

这篇文章有点遗憾并没重现分区不均衡 的样例和Warning: Consumer group ‘testGroup1' is rebalancing. 这里仅将准确 的方式展示,等后续重现了在举行 补充。

紧张 有两个要点:

  • 1、一个消耗 者组只消耗 一个topic.
  • 2、factory.setConcurrency(concurrency);这里设置监听并发数为 部署单元节点*concurrency=分区数目

1、先在kafka消息中创建

对应分区数目 的topic(testTopic2,testTopic3)testTopic1由代码创建

  1. ./kafka-topics.sh --create --zookeeper 192.168.25.128:2181 --replication-factor 1 --partitions 2 --topic testTopic2
复制代码

2、添加设置 文件application.properties

  1. kafka.test.topic1=testTopic1
  2. kafka.test.topic2=testTopic2
  3. kafka.test.topic3=testTopic3
  4. kafka.broker=192.168.25.128:9092
  5. auto.commit.interval.time=60000
  6. #kafka.test.group=customer-test
  7. kafka.test.group1=testGroup1
  8. kafka.test.group2=testGroup2
  9. kafka.test.group3=testGroup3
  10. kafka.offset=earliest
  11. kafka.auto.commit=false
  12. session.timeout.time=10000
  13. kafka.concurrency=2
复制代码

3、创建kafka工厂

  1. package com.yin.customer.config;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.common.serialization.StringDeserializer;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  8. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  9. import org.springframework.kafka.core.ConsumerFactory;
  10. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  11. import org.springframework.kafka.listener.AbstractMessageListenerContainer;
  12. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  13. import org.springframework.kafka.listener.ContainerProperties;
  14. import org.springframework.stereotype.Component;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. /**
  18. * @author yin
  19. * @Date 2019/11/24 15:54
  20. * @Method
  21. */
  22. @Configuration
  23. @Component
  24. public class KafkaConfig {
  25. @Value("${kafka.broker}")
  26. private String broker;
  27. @Value("${kafka.auto.commit}")
  28. private String autoCommit;
  29. // @Value("${kafka.test.group}")
  30. //private String testGroup;
  31. @Value("${session.timeout.time}")
  32. private String sessionOutTime;
  33. @Value("${auto.commit.interval.time}")
  34. private String autoCommitTime;
  35. @Value("${kafka.offset}")
  36. private String offset;
  37. @Value("${kafka.concurrency}")
  38. private Integer concurrency;
  39. @Bean
  40. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){
  41. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  42. factory.setConsumerFactory(consumerFactory());
  43. //监听设置两个个分区
  44. factory.setConcurrency(concurrency);
  45. //打开批量拉取数据
  46. factory.setBatchListener(true);
  47. //这里设置的是心跳时间也是拉的时间,也就说每间隔max.poll.interval.ms我们就调用一次poll,kafka默认是300s,心跳只能在poll的时候发出,如果连续两次poll的时候超过
  48. //max.poll.interval.ms 值就会导致rebalance
  49. //心跳导致GroupCoordinator以为本地consumer节点挂掉了,引发了partition在consumerGroup里的rebalance。
  50. // 当rebalance后,之前该consumer拥有的分区和offset信息就失效了,同时导致不断的报auto offset commit failed。
  51. factory.getContainerProperties().setPollTimeout(3000);
  52. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  53. return factory;
  54. }
  55. private ConsumerFactory<String,String> consumerFactory() {
  56. return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
  57. }
  58. @Bean
  59. public Map<String, Object> consumerConfigs() {
  60. Map<String, Object> propsMap = new HashMap<>();
  61. //kafka的地址
  62. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
  63. //是否自动提交 Offset
  64. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
  65. // enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑
  66. //默认5秒钟,一个 Consumer 将会提交它的 Offset 给 Kafka
  67. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  68. //这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
  69. //zookeeper.session.timeout.ms 默认值:6000
  70. //ZooKeeper的session的超时时间,如果在这段时间内没有收到ZK的心跳,则会被认为该Kafka server挂掉了。
  71. // 如果把这个值设置得过低可能被误认为挂掉,如果设置得过高,如果真的挂了,则需要很长时间才能被server得知。
  72. propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionOutTime);
  73. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  74. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  75. //组与组间的消费者是没有关系的。
  76. //topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。
  77. //propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);
  78. //当创建一个新分组的消费者时,auto.offset.reset值为latest时,
  79. // 表示消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。
  80. // https://blog.csdn.net/u012129558/article/details/80427016
  81. //earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
  82. // latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
  83. propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
  84. //不是指每次都拉50条数据,而是一次最多拉50条数据()
  85. propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
  86. return propsMap;
  87. }
  88. }
复制代码

4、展示kafka消耗 者

  1. @Component
  2. public class KafkaConsumer {
  3. private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
  4. @KafkaListener(topics = "${kafka.test.topic1}",groupId = "${kafka.test.group1}",containerFactory = "kafkaListenerContainerFactory")
  5. public void listenPartition1(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
  6. logger.info("testTopic1 recevice a message size :{}" , records.size());
  7. try {
  8. for (ConsumerRecord<?, ?> record : records) {
  9. Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  10. logger.info("received:{} " , record);
  11. if (kafkaMessage.isPresent()) {
  12. Object message = record.value();
  13. String topic = record.topic();
  14. Thread.sleep(300);
  15. logger.info("p1 topic is:{} received message={}",topic, message);
  16. }
  17. }
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. } finally {
  21. ack.acknowledge();
  22. }
  23. }
  24. @KafkaListener(topics = "${kafka.test.topic2}",groupId = "${kafka.test.group2}",containerFactory = "kafkaListenerContainerFactory")
  25. public void listenPartition2(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
  26. logger.info("testTopic2 recevice a message size :{}" , records.size());
  27. try {
  28. for (ConsumerRecord<?, ?> record : records) {
  29. Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  30. logger.info("received:{} " , record);
  31. if (kafkaMessage.isPresent()) {
  32. Object message = record.value();
  33. String topic = record.topic();
  34. Thread.sleep(300);
  35. logger.info("p2 topic :{},received message={}",topic, message);
  36. }
  37. }
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. } finally {
  41. ack.acknowledge();
  42. }
  43. }
  44. @KafkaListener(topics = "${kafka.test.topic3}",groupId = "${kafka.test.group3}",containerFactory = "kafkaListenerContainerFactory")
  45. public void listenPartition3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
  46. logger.info("testTopic3 recevice a message size :{}" , records.size());
  47. try {
  48. for (ConsumerRecord<?, ?> record : records) {
  49. Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  50. logger.info("received:{} " , record);
  51. if (kafkaMessage.isPresent()) {
  52. Object message = record.value();
  53. String topic = record.topic();
  54. logger.info("p3 topic :{},received message={}",topic, message);
  55. Thread.sleep(300);
  56. }
  57. }
  58. } catch (Exception e) {
  59. e.printStackTrace();
  60. } finally {
  61. ack.acknowledge();
  62. }
  63. }
  64. }
复制代码

查看分区消耗 环境 :

在这里插入图片形貌

kafka出现多少 分区不消耗 的征象

克日 ,有用 户反馈kafka有topic出现某个消耗 组消耗 的时间 ,有几个分区不停 不消耗 消息,消息不停 积压(图1)。除了不停 积压外,还有一个征象 就是消耗 组不停 在重均衡 ,大约每5分钟就会重均衡 一次。详细 表现为消耗 分区的owner不停 在改变(图2)。

办理
kafka消息堆积及分区不匀称
的标题

(图1)

办理
kafka消息堆积及分区不匀称
的标题

(图2)

定位过程

业务侧没有报错,同时kafka服务端日记 也统统 正常,同事先将消耗 组的机器滚动重启,仍然 还是那几个分区没有消耗 ,之后将这几个不消耗 的分区迁移 至别的broker上,依然没有消耗 。

还有一个希奇 的地方,就是每次重均衡 后,不消耗 的那几个分区的消耗 owner地点 机器的网络都有流量变化。按理说不消耗 应该就是拉取不到分区不会有流量的。于是让运维去拉了下不消耗 的consumer的jstack日记 。一看果然发现了题目 地点 。

办理
kafka消息堆积及分区不匀称
的标题

从堆栈看,consumer已经拉取到消息,然后就不停 卡在处理消息的业务逻辑上。这阐明 kafka是没有题目 的,用户的业务逻辑有题目 。

consumer在拉取完一批消息后,就不停 在处理这批消息,但是这批消息中有多少 条消息无法处理,而业务又没有超时操作或者非常 处理导致进程 不停 处于消耗 中,无法去poll下一批数据。

又由于业务采用的是autocommit的offset提交方式,而根据源码可知,consumer只有在下一次poll中才会自动 提交前次 poll的offset,以是 业务不停 在拉取同一批消息而无法更新offset。反映的征象 就是该consumer对应的分区的offset不停 没有变,以是 有积压的征象 。

至于为什么会不停 在重均衡 消耗 组的缘故起因 也很明确 了,就是由于 有消耗 者不停 卡在处理消息的业务逻辑上,超过了max.poll.interval.ms(默认5min),消耗 组就会将该消耗 者踢出消耗 组,从而发生重均衡 。

验证

让业务方去查证业务日记 ,验证了积压的这几个分区,总是在循环的拉取同一批消息。

办理 方法

暂时 办理 方法就是跳过有题目 的消息,将offset重置到有题目 的消息之后。本质上还是要业务侧修改业务逻辑,增长 超时或者非常 处理机制,最好不要采用自动 提交offset的方式,可以手动管理。

以上为个人履历 ,渴望 能给大家一个参考,也渴望 大家多多支持脚本之家。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

avatar 小仙女要起飞 | 2021-9-27 11:04:18 | 显示全部楼层
有节操!
回复

使用道具 举报

avatar 阿甘cx1982 | 2021-10-3 18:44:59 | 显示全部楼层
不错的帖子,值得收藏!
回复

使用道具 举报

avatar 眠眠不觉量 | 2021-10-6 01:40:59 | 显示全部楼层
顶顶更健康!
回复

使用道具 举报

大神好强大!
回复

使用道具 举报

青春不在了,青春痘还在!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则