RocketMQ基础篇Consumer消费消息
大黄 Lv4

消费消息逻辑

消费消息逻辑主要分为三个模块

  • Rebalance
  • 拉取消息
  • 消费消息

    Rebalance

    RocketMQ基础篇 Consumer消费消息 流程图.png
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // RebalanceImpl
    public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
    // 遍历每个主题的队列
    // subTable 会在 DefaultMQPushConsumerImpl 的 subscribe 和 unsubscribe 时修改
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
    final String topic = entry.getKey();
    try {
    // 对队列进行重新负载
    this.rebalanceByTopic(topic, isOrder);
    } catch (Throwable e) {
    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    log.warn("rebalanceByTopic Exception", e);
    }
    }
    }
    }

    this.truncateMessageQueueNotMyTopic();
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
    case BROADCASTING: {
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    if (mqSet != null) {
    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
    if (changed) {
    this.messageQueueChanged(topic, mqSet, mqSet);
    log.info("messageQueueChanged {} {} {} {}",
    consumerGroup,
    topic,
    mqSet,
    mqSet);
    }
    } else {
    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    }
    break;
    }
    case CLUSTERING: {
    // topicSubscribeInfoTable topic订阅信息缓存表
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    // 发送请求到broker获取topic下该消费组内当前所有的消费者客户端id
    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    if (null == mqSet) {
    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    }
    }

    if (null == cidAll) {
    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
    }

    if (mqSet != null && cidAll != null) {
    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
    mqAll.addAll(mqSet);

    // 排序保证了同一个消费组内消费者看到的视图保持一致,确保同一个消费队列不会被多个消费者分配
    Collections.sort(mqAll);
    Collections.sort(cidAll);

    // 分配算法 (尽量使用前两种)
    // 默认有5种 1)平均分配 2)平均轮询分配 3)一致性hash
    // 4)根据配置 为每一个消费者配置固定的消息队列 5)根据broker部署机房名,对每个消费者负责不同的broker上的队列
    // 但是如果消费者数目大于消息队列数量,则会有些消费者无法消费消息
    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

    // 当前消费者分配到的队列
    List<MessageQueue> allocateResult = null;
    try {
    allocateResult = strategy.allocate(
    this.consumerGroup,
    this.mQClientFactory.getClientId(),
    mqAll,
    cidAll);
    } catch (Throwable e) {
    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
    e);
    return;
    }

    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
    if (allocateResult != null) {
    allocateResultSet.addAll(allocateResult);
    }

    // 更新消息消费队列,如果是新增的消息消费队列,则会创建一个消息拉取请求并立即执行拉取
    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    if (changed) {
    log.info(
    "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
    strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
    allocateResultSet.size(), allocateResultSet);
    this.messageQueueChanged(topic, mqSet, allocateResultSet);
    }
    }
    break;
    }
    default:
    break;
    }
    }

    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
    final boolean isOrder) {
    boolean changed = false;

    Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
    while (it.hasNext()) {
    Entry<MessageQueue, ProcessQueue> next = it.next();
    MessageQueue mq = next.getKey();
    ProcessQueue pq = next.getValue();

    if (mq.getTopic().equals(topic)) {
    // 当前分配到的队列中不包含原先的队列(说明当前队列被分配给了其他消费者)
    if (!mqSet.contains(mq)) {
    // 丢弃 processQueue
    pq.setDropped(true);
    // 移除当前消息队列
    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
    it.remove();
    changed = true;
    log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
    }
    } else if (pq.isPullExpired()) {
    switch (this.consumeType()) {
    case CONSUME_ACTIVELY:
    break;
    case CONSUME_PASSIVELY:
    pq.setDropped(true);
    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
    it.remove();
    changed = true;
    log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
    consumerGroup, mq);
    }
    break;
    default:
    break;
    }
    }
    }
    }

    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
    // 消息消费队列缓存中不存在当前队列 本次分配新增的队列
    if (!this.processQueueTable.containsKey(mq)) {
    // 向broker发起锁定队列请求 (向broker端请求锁定MessageQueue,同时在本地锁定对应的ProcessQueue)
    if (isOrder && !this.lock(mq)) {
    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
    // 加锁失败,跳过,等待下一次队列重新负载时再尝试加锁
    continue;
    }

    // 从内存中移除该消息队列的消费进度
    this.removeDirtyOffset(mq);
    ProcessQueue pq = new ProcessQueue();

    long nextOffset = -1L;
    try {
    nextOffset = this.computePullFromWhereWithException(mq);
    } catch (Exception e) {
    log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
    continue;
    }

    if (nextOffset >= 0) {
    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
    if (pre != null) {
    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
    } else {
    // 首次添加,构建拉取消息的请求
    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    PullRequest pullRequest = new PullRequest();
    pullRequest.setConsumerGroup(consumerGroup);
    pullRequest.setNextOffset(nextOffset);
    pullRequest.setMessageQueue(mq);
    pullRequest.setProcessQueue(pq);
    pullRequestList.add(pullRequest);
    changed = true;
    }
    } else {
    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    }
    }
    }

    // 立即拉取消息(对新增的队列)
    this.dispatchPullRequest(pullRequestList);

    return changed;
    }

    集群模式下消息负载的步骤

    由流程图和代码,我们可以得知,集群模式下消息负载主要有以下几个步骤:
  1. 从Broker获取订阅当前Topic的消费者列表
  2. 根据具体的策略进行负载均衡
  3. 对当前消费者分配到的队列进行处理
    1. 原来有,现在没有:丢弃对应的消息处理队列(ProcessQueue)
    2. 原来没有,现在有:添加消息处理队列(ProcessQueue),如果是第一次新增,还会创建一个消息拉取请求

      Rebalance的策略

      RocketMQ基础篇 Consumer消费消息 Rebalance策略.png

      拉取消息

      RocketMQ基础篇 Consumer消费消息 拉取消息流程图.png
      拉取消息的代码太多了,就不再这里贴出来了。

拉取消息的大致流程

这里说一下大致流程,然后有几个需要注意的地方
流程:在我们Rebalance第一次添加负责的队列和后续拉取消息后,都会再提交一个拉取请求到拉取请求队列(pullRequestQueue)中,然后有一个线程不停的去里面获取拉取请求,去执行拉取的操作
这里说一个RocketMQ消费者这边设计的一个亮点
它将拉取消息,消费消息通过两个任务队列的方式进行解耦,然后每一个模块仅需要负责它自己的功能。(虽然大佬们觉得很常见,但是当时我看的时候还是感觉妙呀~)
另外还有一点需要注意的是:拉取消息的时候broker和consumer都会对消息进行过滤,只不过broker是根据tag的hash进行过滤的,而consumer是根据具体的tag字符串匹配过滤的。这也是有的时候,明明拉取到了消息,但是却没有需要消费的消息产生的原因
既然说到了消息过滤,这边先简单提一下RocketMQ消息过滤的几种方式

  • 表达式过滤
    • tag
    • SQL92
  • 类过滤

消费消息

RocketMQ基础篇 Consumer消费消息 消费消息流程图.png
这边也先说几个注意点吧,后面再单独更新一篇文章。
(一)顺序消费和非顺序消费消费失败的处理
非顺序消费:直接丢入延时队列中,等待重试。顺序消息:本地重试
(二)消费失败偏移量的更新:只有当前这批消息全部消费成功后,才会将偏移量更新成为这批消息最后一条的偏移量
(三)广播消息失败不会重试,仅打印失败日志

一些补充

为什么同一个消费组下消费者的订阅信息要相同

首先,先说一下什么叫做同一个消费组下消费者的订阅信息要相同
即:在相同的GroupId下,每一个消费者他们的订阅内容(Topic+Tag)要保持一致,否则会导致消息无法被正常消费
参考文档:阿里云:订阅关系一致
RocketMQ基础篇 Consumer消费消息 订阅关系一致说明.png
我们在看待这个问题的时候,可以把它分为两类情况考虑

  • topic不一致
  • tag不一致

    (一)topic不一致的问题

    首先先说一个场景,消费者A监听了TopicA,消费者B监听了TopicB,但是消费者A和消费者B同属一个groupTest
    在Rebalance阶段,消费者A对TopicA进行负载均衡时,会去查询groupTest下的所有消费者信息。获取到了消费者A和消费者B。此时就会将TopicA的队列对消费者A和消费者B进行负载均衡(例如消费者A分配到了1234四个队列,消费者B分配到了5678四个队列)。此时消费者B没有针对TopicA的处理逻辑,就会导致推送到5678这几个队列里面的消息没有办法得到处理。
    偏移量不发生变化

(二)tag不一致的问题

随着消费者A,消费者B负载均衡的不断进行,会不断把最新的订阅信息(消息过滤规则)上报给broker。broker就会不断的覆盖更新,导致tag信息不停地变化,而tag的变化在消费者拉取消息时broker的过滤就会产生影响,会导致一些本来要被消费者拉取到的消息被broker过滤掉
过滤了,偏移量在消费后直接更新

关于消息消费的推拉模式

MQPushConsumer方式,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。(我们日常开发时使用到的模式)
MQPullConsumer方式,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
总结:我们日常开发中使用到的是RocketMQ的推模式(虽然底层上的本质还是拉模式),但是不需要我们去处理拉取消息的动作触发等。如果我们要使用RocketMQ的拉模式,就需要我们自己实现从队列中拿拉取请求,然后显示调用拉消息的api,另外还要去往队列中放置拉取请求等操作。

消费幂等控制

除了基于业务状态等操作控制幂等的基础上,还可以通过消息的唯一id进行判断当前消息是否消费过
下图是生产者生产消息时产生的唯一标识 **UNIQ_KEY **,消息重试发送,这个msgid是不会变化的
RocketMQ基础篇 Consumer消费消息 通过msgid控制消费幂等.png
对应的消息内容

1
ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=broker-a  , queueId=14, storeSize=178, queueOffset=3, sysFlag=0, bornTimestamp=1659604568508, bornHost=/192.168.196.123:50628, storeTimestamp=1659604568547, storeHost=/172.30.0.4:10911, msgId=AC1E000400002A9F0000000000002B12, commitLogOffset=11026, bodyCRC=717801981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1659663419735, UNIQ_KEY=0000010163E018B4AAC21327B1BC003E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 50], transactionId='null'}]]

消费者总结

讲了这么多的消费者的内容,出现了好多名词,也把消费者的一些比较核心的内容逐个讲了一遍。
那么,在这里,我们将消费者这个模块里面的所有东西,在进行一个完整的串联。然后消费者这一方面的介绍就要告一段落了
RocketMQ基础篇 Consumer消费消息 完整流程图2.png
附一张丁威老师的流程图
RocketMQ基础篇 Consumer消费消息 丁威老师流程图.jpeg

  • Post title:RocketMQ基础篇Consumer消费消息
  • Post author:大黄
  • Create time:2022-08-05 10:10:02
  • Post link:https://huangbangjing.cn/2022/08/05/RocketMQ基础篇Consumer消费消息/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.