写一篇RocketMQ卷文让自己冷静一下
大黄 Lv4

RocketMQ卷文 background.jpeg


不吃(烧烤)不喝(奶茶可乐)看了好久才概括出这么一点点东西,希望大佬们能够有耐心看一看,遇到说的不对的地方,也欢迎在评论区或者私信与我交流

另外完整版的代码注释,我在我的github上也添加了,感兴趣的小伙伴也可以点击这个链接去看一波 github地址

觉得我讲的有那么一点点道理,对你有那么一丢丢的帮助的,也可以给我一波点赞关注666哟~

废话不多说,下面开始我的表演~

RocketMQ全局流程图

RocketMQ卷文 全流程.jpeg

上来就是这么一大张图片,相信大家肯定完全不想看下去。(那么我为什么还要放在一开始呢?主要是为了能够让大家有一个全局的印象,然后后续复习的时候也可以根据这个流程图去具体复习)

那么,下面我们就针对一些问题来具体描述RocketMQ的工作流程 此处内容会不断补充,也欢迎大家把遇到的问题在评论区留下来

消息消费逻辑

消息消费可以分为三大模块

  • Rebalance
  • 拉取消息
  • 消费消息

Rebalance

RocketMQ卷文 Rebalance流程.jpeg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// RebalanceImpl
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历每个主题的队列
// subTable 会在 DefaultMQPushConsumerImpl 的 subscribe 和 unsubscribe 时修改
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 对队列进行重新负载
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}

this.truncateMessageQueueNotMyTopic();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// topicSubscribeInfoTable topic订阅信息缓存表
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 发送请求到broker获取topic下该消费组内当前所有的消费者客户端id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);

// 排序保证了同一个消费组内消费者看到的视图保持一致,确保同一个消费队列不会被多个消费者分配
Collections.sort(mqAll);
Collections.sort(cidAll);

// 分配算法 (尽量使用前两种)
// 默认有5种 1)平均分配 2)平均轮询分配 3)一致性hash
// 4)根据配置 为每一个消费者配置固定的消息队列 5)根据broker部署机房名,对每个消费者负责不同的broker上的队列
// 但是如果消费者数目大于消息队列数量,则会有些消费者无法消费消息
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

// 当前消费者分配到的队列
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}

Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}

// 更新消息消费队列,如果是新增的消息消费队列,则会创建一个消息拉取请求并立即执行拉取
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();

if (mq.getTopic().equals(topic)) {
// 当前分配到的队列中不包含原先的队列(说明当前队列被分配给了其他消费者)
if (!mqSet.contains(mq)) {
// 丢弃 processQueue
pq.setDropped(true);
// 移除当前消息队列
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 消息消费队列缓存中不存在当前队列 本次分配新增的队列
if (!this.processQueueTable.containsKey(mq)) {
// 向broker发起锁定队列请求 (向broker端请求锁定MessageQueue,同时在本地锁定对应的ProcessQueue)
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
// 加锁失败,跳过,等待下一次队列重新负载时再尝试加锁
continue;
}

// 从内存中移除该消息队列的消费进度
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();

long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}

if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 首次添加,构建拉取消息的请求
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}

// 立即拉取消息(对新增的队列)
this.dispatchPullRequest(pullRequestList);

return changed;
}

由流程图和代码,我们可以得知,集群模式下消息负载主要有以下几个步骤:

  1. 从Broker获取订阅当前Topic的消费者列表
  2. 根据具体的策略进行负载均衡
  3. 对当前消费者分配到的队列进行处理
    1. 原来有,现在没有:丢弃对应的消息处理队列(ProcessQueue)
    2. 原来没有,现在有:添加消息处理队列(ProcessQueue),如果是第一次新增,还会创建一个消息拉取请求

拉取消息

RocketMQ卷文 拉取消息流程.jpeg

拉取消息的代码太多了,我就不再这里贴出来了。

我在这里说一下大致流程,然后有几个需要注意的地方

流程:在我们Rebalance第一次添加负责的队列和后续拉取消息后,都会再提交一个拉取请求到拉取请求队列(pullRequestQueue)中,然后有一个线程不停的去里面获取拉取请求,去执行拉取的操作

这里说一个RocketMQ消费者这边设计的一个亮点

它将拉取消息,消费消息通过两个任务队列的方式进行解耦,然后每一个模块仅需要负责它自己的功能。(虽然大佬们觉得很常见,但是当时我看的时候还是感觉妙呀~)

另外还有一点需要注意的是:拉取消息的时候broker和consumer都会对消息进行过滤,只不过broker是根据tag的hash进行过滤的,而consumer是根据具体的tag字符串匹配过滤的。这也是有的时候,明明拉取到了消息,但是却没有需要消费的消息产生的原因

既然说到了消息过滤,这边先简单提一下RocketMQ消息过滤的几种方式

  • 表达式过滤
    • tag
    • SQL92
  • 类过滤

消费消息

RocketMQ卷文 消费消息.jpeg

这边也先说几个注意点吧,后面再单独出篇文章。

(一)顺序消费和非顺序消费消费失败的处理

(二)消费失败偏移量的更新:只有当前这批消息全部消费成功后,才会将偏移量更新成为这批消息最后一条的偏移量

(三)广播消息失败不会重试,仅打印失败日志

补充:为什么同一个消费组下消费者的订阅信息要相同

首先,先说一下什么叫做同一个消费组下消费者的订阅信息要相同

即:在相同的GroupId下,每一个消费者他们的订阅内容(Topic+Tag)要保持一致,否则会导致消息无法被正常消费

参考文档:阿里云:订阅关系一致

RocketMQ卷文 Rebalance详情.jpeg

我们在看待这个问题的时候,可以把它分为两类情况考虑

  • topic不一致
  • tag不一致

(一)topic不一致的问题

首先先说一个场景,消费者A监听了TopicA,消费者B监听了TopicB,但是消费者A和消费者B同属一个groupTest

在Rebalance阶段,消费者A对TopicA进行负载均衡时,会去查询groupTest下的所有消费者信息。获取到了消费者A和消费者B。此时就会将TopicA的队列对消费者A和消费者B进行负载均衡(例如消费者A分配到了1234四个队列,消费者B分配到了5678四个队列)。此时消费者B没有针对TopicA的处理逻辑,就会导致推送到5678这几个队列里面的消息没有办法得到处理。

(二)tag不一致的问题

随着消费者A,消费者B负载均衡的不断进行,会不断把最新的订阅信息(消息过滤规则)上报给broker。broker就会不断的覆盖更新,导致tag信息不停地变化,而tag的变化在消费者拉取消息时broker的过滤就会产生影响,会导致一些本来要被消费者拉取到的消息被broker过滤掉

消费者总结

讲了这么多的消费者的内容,出现了好多名词,也把消费者的一些比较核心的内容逐个讲了一遍。

那么,在这里,我们将消费者这个模块里面的所有东西,在进行一个完整的串联。然后消费者这一方面的介绍就要告一段落了

RocketMQ卷文 消费者完整流程.jpeg

延时队列是如何工作的

RocketMQ卷文 延迟队列.jpg

由流程图中我们不难看出,RocketMQ对延时消息的处理,是交由Timer去完成的(相关类ScheduleMessageService)。在Timer的任务队列中读取需要处理的延迟任务,将消息从延迟队列转发到具体的业务队列中

此处补充一点:此处提到的Timer为java工具类包(java.util.Timer)下的一个定时任务工具。它主要由两个部分:TaskQueue queue(任务队列)和TimerThread thread(工作线程)。这边我把它简单的类比为一个单线程的工作线程池

另外在ScheduleMessageService中使用到了Timer的两个方法,我在这里先单独列出来下

  • this.timer.schedule :在任务执行成功后,再加上对应的周期,然后再执行
  • this.timer.scheduleAtFixedRate :每隔指定时间就执行一次,与任务执行时间无关

话不多少,贴上源码(源码虽然枯燥,但希望可以耐心的看完)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// ScheduleMessageService
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 根据延时队列创建对应的定时任务
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
// 第一次,延迟一秒执行任务,后续根据对应延时时间来执行
// 延时级别和消息队列id对应关系 : 消息队列id = 延时级别 - 1
// shedule 在任务执行成功后,再加上对应的周期,然后再执行
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}

// scheduleAtFixedRate 每隔指定时间就执行一次,与任务执行时间无关
this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
if (started.get()) {
// 每个十秒持久化一次延迟队列的处理进度
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// DeliverDelayedMessageTimerTask
@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}

public void executeOnTimeup() {
// 根据 延时队列topic 和 延时队列id 查找消费队列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));

long failScheduleOffset = offset;

if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
// 遍历ConsumeQueue,每一个标准的ConsumeQueue条目为20字节
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();

if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}

long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

// > 0 未到消息消费时间
long countdown = deliverTimestamp - now;

if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);

if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
// 放到对应的 %RETRY%+gid 重试topic下进行消费(转发消息)
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);

if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
}
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
}
}
} else {
// 会将下次任务执行时间设置为countdown 即 消息的延时转发时间-当前时间
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for

// 更新延时队列拉取任务进度
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {

bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
// 消费队列不存在,默认为没有需要消费的任务,跳过本次消费

long cqMinOffset = cq.getMinOffsetInQueue();
long cqMaxOffset = cq.getMaxOffsetInQueue();
if (offset < cqMinOffset) {
// 下次拉取任务进度更新
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}

if (offset > cqMaxOffset) {
failScheduleOffset = cqMaxOffset;
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
}
} // end of if (cq != null)

// 根据延时等级创建一个任务
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
  • Post title:写一篇RocketMQ卷文让自己冷静一下
  • Post author:大黄
  • Create time:2021-12-08 14:56:51
  • Post link:https://huangbangjing.cn/2021/12/08/写一篇RocketMQ卷文让自己冷静一下/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.