不吃(烧烤)不喝(奶茶可乐)看了好久才概括出这么一点点东西,希望大佬们能够有耐心看一看,遇到说的不对的地方,也欢迎在评论区或者私信与我交流
另外完整版的代码注释,我在我的github上也添加了,感兴趣的小伙伴也可以点击这个链接去看一波 github地址
觉得我讲的有那么一点点道理,对你有那么一丢丢的帮助的,也可以给我一波点赞关注666哟~
废话不多说,下面开始我的表演~
RocketMQ全局流程图
上来就是这么一大张图片,相信大家肯定完全不想看下去。(那么我为什么还要放在一开始呢?主要是为了能够让大家有一个全局的印象,然后后续复习的时候也可以根据这个流程图去具体复习)
那么,下面我们就针对一些问题来具体描述RocketMQ的工作流程 此处内容会不断补充,也欢迎大家把遇到的问题在评论区留下来
消息消费逻辑
消息消费可以分为三大模块
- Rebalance
- 拉取消息
- 消费消息
Rebalance
1 | // RebalanceImpl |
1 | private void rebalanceByTopic(final String topic, final boolean isOrder) { |
由流程图和代码,我们可以得知,集群模式下消息负载主要有以下几个步骤:
- 从Broker获取订阅当前Topic的消费者列表
- 根据具体的策略进行负载均衡
- 对当前消费者分配到的队列进行处理
- 原来有,现在没有:丢弃对应的消息处理队列(ProcessQueue)
- 原来没有,现在有:添加消息处理队列(ProcessQueue),如果是第一次新增,还会创建一个消息拉取请求
拉取消息
拉取消息的代码太多了,我就不再这里贴出来了。
我在这里说一下大致流程,然后有几个需要注意的地方
流程:在我们Rebalance第一次添加负责的队列和后续拉取消息后,都会再提交一个拉取请求到拉取请求队列(pullRequestQueue)中,然后有一个线程不停的去里面获取拉取请求,去执行拉取的操作
这里说一个RocketMQ消费者这边设计的一个亮点
它将拉取消息,消费消息通过两个任务队列的方式进行解耦,然后每一个模块仅需要负责它自己的功能。(虽然大佬们觉得很常见,但是当时我看的时候还是感觉妙呀~)
另外还有一点需要注意的是:拉取消息的时候broker和consumer都会对消息进行过滤,只不过broker是根据tag的hash进行过滤的,而consumer是根据具体的tag字符串匹配过滤的。这也是有的时候,明明拉取到了消息,但是却没有需要消费的消息产生的原因
既然说到了消息过滤,这边先简单提一下RocketMQ消息过滤的几种方式
- 表达式过滤
- tag
- SQL92
- 类过滤
消费消息
这边也先说几个注意点吧,后面再单独出篇文章。
(一)顺序消费和非顺序消费消费失败的处理
(二)消费失败偏移量的更新:只有当前这批消息全部消费成功后,才会将偏移量更新成为这批消息最后一条的偏移量
(三)广播消息失败不会重试,仅打印失败日志
补充:为什么同一个消费组下消费者的订阅信息要相同
首先,先说一下什么叫做同一个消费组下消费者的订阅信息要相同
即:在相同的GroupId下,每一个消费者他们的订阅内容(Topic+Tag)要保持一致,否则会导致消息无法被正常消费
参考文档:阿里云:订阅关系一致
我们在看待这个问题的时候,可以把它分为两类情况考虑
- topic不一致
- tag不一致
(一)topic不一致的问题
首先先说一个场景,消费者A监听了TopicA,消费者B监听了TopicB,但是消费者A和消费者B同属一个groupTest
在Rebalance阶段,消费者A对TopicA进行负载均衡时,会去查询groupTest下的所有消费者信息。获取到了消费者A和消费者B。此时就会将TopicA的队列对消费者A和消费者B进行负载均衡(例如消费者A分配到了1234四个队列,消费者B分配到了5678四个队列)。此时消费者B没有针对TopicA的处理逻辑,就会导致推送到5678这几个队列里面的消息没有办法得到处理。
(二)tag不一致的问题
随着消费者A,消费者B负载均衡的不断进行,会不断把最新的订阅信息(消息过滤规则)上报给broker。broker就会不断的覆盖更新,导致tag信息不停地变化,而tag的变化在消费者拉取消息时broker的过滤就会产生影响,会导致一些本来要被消费者拉取到的消息被broker过滤掉
消费者总结
讲了这么多的消费者的内容,出现了好多名词,也把消费者的一些比较核心的内容逐个讲了一遍。
那么,在这里,我们将消费者这个模块里面的所有东西,在进行一个完整的串联。然后消费者这一方面的介绍就要告一段落了
延时队列是如何工作的
由流程图中我们不难看出,RocketMQ对延时消息的处理,是交由Timer去完成的(相关类ScheduleMessageService)。在Timer的任务队列中读取需要处理的延迟任务,将消息从延迟队列转发到具体的业务队列中
此处补充一点:此处提到的Timer为java工具类包(java.util.Timer)下的一个定时任务工具。它主要由两个部分:TaskQueue queue(任务队列)和TimerThread thread(工作线程)。这边我把它简单的类比为一个单线程的工作线程池
另外在ScheduleMessageService中使用到了Timer的两个方法,我在这里先单独列出来下
- this.timer.schedule :在任务执行成功后,再加上对应的周期,然后再执行
- this.timer.scheduleAtFixedRate :每隔指定时间就执行一次,与任务执行时间无关
话不多少,贴上源码(源码虽然枯燥,但希望可以耐心的看完)
1 | // ScheduleMessageService |
1 | // DeliverDelayedMessageTimerTask |
- Post title:写一篇RocketMQ卷文让自己冷静一下
- Post author:大黄
- Create time:2021-12-08 14:56:51
- Post link:https://huangbangjing.cn/2021/12/08/写一篇RocketMQ卷文让自己冷静一下/
- Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.