消费消息逻辑
消费消息逻辑主要分为三个模块
- Rebalance
- 拉取消息
- 消费消息
Rebalance
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// RebalanceImpl
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历每个主题的队列
// subTable 会在 DefaultMQPushConsumerImpl 的 subscribe 和 unsubscribe 时修改
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 对队列进行重新负载
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// topicSubscribeInfoTable topic订阅信息缓存表
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 发送请求到broker获取topic下该消费组内当前所有的消费者客户端id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 排序保证了同一个消费组内消费者看到的视图保持一致,确保同一个消费队列不会被多个消费者分配
Collections.sort(mqAll);
Collections.sort(cidAll);
// 分配算法 (尽量使用前两种)
// 默认有5种 1)平均分配 2)平均轮询分配 3)一致性hash
// 4)根据配置 为每一个消费者配置固定的消息队列 5)根据broker部署机房名,对每个消费者负责不同的broker上的队列
// 但是如果消费者数目大于消息队列数量,则会有些消费者无法消费消息
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
// 当前消费者分配到的队列
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 更新消息消费队列,如果是新增的消息消费队列,则会创建一个消息拉取请求并立即执行拉取
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
// 当前分配到的队列中不包含原先的队列(说明当前队列被分配给了其他消费者)
if (!mqSet.contains(mq)) {
// 丢弃 processQueue
pq.setDropped(true);
// 移除当前消息队列
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 消息消费队列缓存中不存在当前队列 本次分配新增的队列
if (!this.processQueueTable.containsKey(mq)) {
// 向broker发起锁定队列请求 (向broker端请求锁定MessageQueue,同时在本地锁定对应的ProcessQueue)
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
// 加锁失败,跳过,等待下一次队列重新负载时再尝试加锁
continue;
}
// 从内存中移除该消息队列的消费进度
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 首次添加,构建拉取消息的请求
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 立即拉取消息(对新增的队列)
this.dispatchPullRequest(pullRequestList);
return changed;
}集群模式下消息负载的步骤
由流程图和代码,我们可以得知,集群模式下消息负载主要有以下几个步骤:
- 从Broker获取订阅当前Topic的消费者列表
- 根据具体的策略进行负载均衡
- 对当前消费者分配到的队列进行处理
拉取消息的大致流程
这里说一下大致流程,然后有几个需要注意的地方
流程:在我们Rebalance第一次添加负责的队列和后续拉取消息后,都会再提交一个拉取请求到拉取请求队列(pullRequestQueue)中,然后有一个线程不停的去里面获取拉取请求,去执行拉取的操作
这里说一个RocketMQ消费者这边设计的一个亮点
它将拉取消息,消费消息通过两个任务队列的方式进行解耦,然后每一个模块仅需要负责它自己的功能。(虽然大佬们觉得很常见,但是当时我看的时候还是感觉妙呀~)
另外还有一点需要注意的是:拉取消息的时候broker和consumer都会对消息进行过滤,只不过broker是根据tag的hash进行过滤的,而consumer是根据具体的tag字符串匹配过滤的。这也是有的时候,明明拉取到了消息,但是却没有需要消费的消息产生的原因
既然说到了消息过滤,这边先简单提一下RocketMQ消息过滤的几种方式
- 表达式过滤
- tag
- SQL92
- 类过滤
消费消息
这边也先说几个注意点吧,后面再单独更新一篇文章。
(一)顺序消费和非顺序消费消费失败的处理
非顺序消费:直接丢入延时队列中,等待重试。顺序消息:本地重试
(二)消费失败偏移量的更新:只有当前这批消息全部消费成功后,才会将偏移量更新成为这批消息最后一条的偏移量
(三)广播消息失败不会重试,仅打印失败日志
一些补充
为什么同一个消费组下消费者的订阅信息要相同
首先,先说一下什么叫做同一个消费组下消费者的订阅信息要相同
即:在相同的GroupId下,每一个消费者他们的订阅内容(Topic+Tag)要保持一致,否则会导致消息无法被正常消费
参考文档:阿里云:订阅关系一致
我们在看待这个问题的时候,可以把它分为两类情况考虑
- topic不一致
- tag不一致
(一)topic不一致的问题
首先先说一个场景,消费者A监听了TopicA,消费者B监听了TopicB,但是消费者A和消费者B同属一个groupTest
在Rebalance阶段,消费者A对TopicA进行负载均衡时,会去查询groupTest下的所有消费者信息。获取到了消费者A和消费者B。此时就会将TopicA的队列对消费者A和消费者B进行负载均衡(例如消费者A分配到了1234四个队列,消费者B分配到了5678四个队列)。此时消费者B没有针对TopicA的处理逻辑,就会导致推送到5678这几个队列里面的消息没有办法得到处理。
偏移量不发生变化
(二)tag不一致的问题
随着消费者A,消费者B负载均衡的不断进行,会不断把最新的订阅信息(消息过滤规则)上报给broker。broker就会不断的覆盖更新,导致tag信息不停地变化,而tag的变化在消费者拉取消息时broker的过滤就会产生影响,会导致一些本来要被消费者拉取到的消息被broker过滤掉
过滤了,偏移量在消费后直接更新
关于消息消费的推拉模式
MQPushConsumer方式,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。(我们日常开发时使用到的模式)
MQPullConsumer方式,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
总结:我们日常开发中使用到的是RocketMQ的推模式(虽然底层上的本质还是拉模式),但是不需要我们去处理拉取消息的动作触发等。如果我们要使用RocketMQ的拉模式,就需要我们自己实现从队列中拿拉取请求,然后显示调用拉消息的api,另外还要去往队列中放置拉取请求等操作。
消费幂等控制
除了基于业务状态等操作控制幂等的基础上,还可以通过消息的唯一id进行判断当前消息是否消费过
下图是生产者生产消息时产生的唯一标识 **UNIQ_KEY **,消息重试发送,这个msgid是不会变化的
对应的消息内容
1 | ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=broker-a , queueId=14, storeSize=178, queueOffset=3, sysFlag=0, bornTimestamp=1659604568508, bornHost=/192.168.196.123:50628, storeTimestamp=1659604568547, storeHost=/172.30.0.4:10911, msgId=AC1E000400002A9F0000000000002B12, commitLogOffset=11026, bodyCRC=717801981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1659663419735, UNIQ_KEY=0000010163E018B4AAC21327B1BC003E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 50], transactionId='null'}]] |
消费者总结
讲了这么多的消费者的内容,出现了好多名词,也把消费者的一些比较核心的内容逐个讲了一遍。
那么,在这里,我们将消费者这个模块里面的所有东西,在进行一个完整的串联。然后消费者这一方面的介绍就要告一段落了
附一张丁威老师的流程图
- Post title:RocketMQ基础篇Consumer消费消息
- Post author:大黄
- Create time:2022-08-05 10:10:02
- Post link:https://huangbangjing.cn/2022/08/05/RocketMQ基础篇Consumer消费消息/
- Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.