type
status
date
slug
summary
tags
category
icon
password
问题现象
负责的业务中有一个应用因为特殊原因,需要修改消息配置(将Spring Cloud Stream 改为 RocketMQ native),修改前和修改后的配置项如下:
原始订阅
修改订阅
但是当机器发布一半开始观察灰度的时候,出现消息积压的问题。
问题原因
消息订阅关系不一致
经过历史信息经验和积累,感觉有可能是订阅组和订阅器订阅关系不一致导致的消息堆积问题(因为订阅组的订阅关系是A,有的是B,MQ不能确定是否要消费,就能只能先堆积到broker中),查看MQ控制台后发现,确实是消息订阅关系不一致,导致消息堆积
已经发布的那台订阅关系如下(明显多于已经发布的机器的订阅关系)
Spring Cloud Stream 和 RocketMQ Native
所以就引申出一个问题,为什么将Spring Cloud Stream修改为原生的MetaQ之后,同一个CosumerId对应的订阅关系就会改变呢?
更简单来说,就是为什么当RocketMq和Spring Cloud Stream使用相同的CustomerId之后,RocketMQ的订阅关系会把Spring Cloud Stream的订阅关系给冲掉呢?
注意,一个customerId是可以订阅多个topic的
这个私活就只能翻启动源码来解答这个问题了
- MQConsumerInner:记录当前cosumerGroup和服务端的交互方式,以及topic和tag的映射关系。默认的实现是DefaultMQPushConsumerImpl,和consumerGroup的对应关系是1:1
- MQClientInstance:统一管理网络连接等可复用的对象,通过Map维护了ConsumerGroupId和MQConsumerInner的映射关系。简单来说,就是一个ConsumerGroup,只能对应一个MQConsumerInner。
如下代码所示:
Spring cloud Stream
Spring Cloud Stream是链接Spring和中间件的一个胶水层,在Spring Cloud Stream启动的时候,也会注册一个ConsumerGroup,如下代码所示:
问题原因
分析到这里,原因已经很明显了。Spring Cloud Stream会在启动的时候自己new一个metaPushConsumer(事实上就是一个新的MQConsumerInner),所以对于一个ConsumerGroup来说,就存在了两个MQConsumerInner,这显然是不符合RocketMQ要求的1:1的映射关系的,所以RocketMQ默认会用新的映射替代老的关系。显然,Spring Cloud Stream被RocketMQ原生的给替代掉了。
这也就是为什么已经发布的机器总,对于ConsumerA来说,只剩下RocketMQ原生的那组订阅关系了。
解决思路
修改consumerId
思考和总结
- 问题原因并不复杂,但是很多人可能分析到第一层(订阅关系不一致导致消息堆积)就不会继续分析了,但是我们还是需要有更深入的探索精神的。
- 生产环境尽量不要搞两套配置项,会额外增加理解成本。
中间件代码如何确定版本
arthas中的sc命令
Idea如何debug具体版本的中间件