RocketMQ基础篇Producer发送消息
大黄 Lv4

RocketMQ基础篇 Producer发送消息 流程图.png
生产者发送消息的主要流程图如上图所示。具体的代码由于比较多,我就不在这边贴出来的。
主要讲一下我认为比较重要的点

消息队列负载均衡

Producer会每隔30s从Namesrv获取最新的Topic路由信息,并缓存到本地

1
2
3
4
5
6
7
8
9
10
11
12
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
// 每个30s从NameServer更新路由表信息
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

路由信息就是用来发送时选择具体的Broker和队列的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

// 获取Topic队列信息
// 此处的流程:
// 先从本地缓存中获取,获取到返回
// 没有获取到.从NameSrv中获取,获取到返回
// 没有获取到.如果能够自动创建Topic,会把消息放到TBW102.后续会有自动创建Topic的逻辑
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);

mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue threw exception.", e);
}
// 省略代码....
}
// 省略代码....
}

此处流程可以总结为:

  • 尝试获取队列列表

    • 从本地缓存中获取,获取到返回
    • 没有获取到。从NameSrv中获取,获取到返回
    • 没有获取到。获取TBW102的队列,获取到返回。后续会有自动创建Topic的逻辑
  • 选择其中一个队列 selector 策略

    关于异步消息

    异步消息需要我们单独加一个回调方法,添加在发送消息成功/失败的一些处理。
    因为异步消息没有对Broker回来的结果进行额外的处理,那么自然我们就不能像同步消息一样,对Broker返回回来的结果单独针对SendResult进行单独的重试操作。所以需要我们在失败的回调方法上进行额外的处理(例如重试消息发送)
    具体的原因,我们可以看下消息发送的主干逻辑

    消息发送的主干逻辑

    另外我在这里贴一下消息发送的主干代码(具体的代码和它们的注释可以看下我的github 我是一个超链接

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    // ... 省略部分代码
    // 查找主题路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
    // ... 省略部分代码
    for (; times < timesTotal; times++) {
    // 上次发送的broker名称
    String lastBrokerName = null == mq ? null : mq.getBrokerName();
    // 选择一条messagequeue(轮询+失败规避)
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
    // ... 省略部分代码
    try {
    // ... 省略部分代码
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
    endTimestamp = System.currentTimeMillis();
    // 失败规避
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
    switch (communicationMode) {
    case ASYNC:
    return null;
    case ONEWAY:
    return null;
    case SYNC:
    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
    // 发送失败重试
    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
    continue;
    }
    }

    return sendResult;
    default:
    break;
    }
    } catch (/** 一系列异常处理 **/) {

    }
    }
    }
    }

    流程:

  • 尝试获取Topic路由信息

  • 选择一个队列(默认是轮询+失败规避)

  • 发送消息,失败会重试

    失败规避

    上面提到了失败规避,到底什么是失败规避呢?
    在我们一次消息发送过程中,消息有可能发送失败。在消息发送失败,重试时选择发送消息的队列时,就会规避上次MessageQueue所在的Broker。这样能够减少很多不必要的请求(因为Broker宕机后,很大情况下这个Broker短时间内依旧是无法使用的)
    那么,为什么会有宕机的Broker在我们的内存中存在?
    因为NameSrv是根据心跳检测来确定Broker是否可用的(有间隔 10s),且消息生产者更新路由信息也是有间隔的(30s)。且为了Namesrv设计的简单,Namesrv不会主动将Broker宕机的信息推给消息生产者,而是需要消息生产者定时更新的时候,才会感知到Broker宕机。
    在这期间存在误差,所以我们是要一个机制(失败规避策略)来减少一些不必要的性能消耗

另外,失败规避默认是关闭的

1
2
3
4
5
6
7
8
9
private boolean sendLatencyFaultEnable = false;

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
// 默认以30s作为computeNotAvailableDuration的参数
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}

关于发送端消息的可靠性

发送端消息的可靠性主要是靠消息发送重试
RocketMQ的架构中,可能会存在多个Broker为某个topic提供服务,这个topic的消息被存放在多个Broker下。(有点类似于Redis的Cluster)
当生产者往某个Broker发送消息失败时,会进行失败规避,选择其他提供服务的Broker,进行发送消息。确保消息能够被稳定的送到Broker中

消息重新发送,消息的msgid会发生变化吗

msgId,对于客户端来说msgId是由客户端producer实例端生成的,具体来说,调用方法MessageClientIDSetter.createUniqIDBuffer()生成唯一的Id;

1
2
3
4
5
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
// 为消息分配唯一的全局id
MessageClientIDSetter.setUniqID(msg);
}
1
2
3
4
5
6
7
8
public static void setUniqID(final Message msg) {
// 因为有了这个判读,所以我们重试的时候,消息id是不会发生变化的
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
}
}

public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";

但是,offsetMsgId是会发生变化的
RocketMQ基础篇 Producer发送消息 msgid.png

  • msgId,对于客户端来说msgId是由客户端producer实例端生成的,具体来说,调用方法MessageClientIDSetter.createUniqIDBuffer()生成唯一的Id;
  • offsetMsgId,offsetMsgId是由Broker服务端在写入消息时生成的(采用”IP地址+Port端口”与“CommitLog的物理偏移量地址”做了一个字符串拼接),其中offsetMsgId就是在RocketMQ控制台直接输入查询的那个messageId。

    消息发送的时候,如何自己选择消息推送的队列

    发送消息的时候可以自定义一个MessageQueueSelector,就可以自己选推到那个队列,实现顺序消息的需求了
    1
    2
    3
    void send(final Message msg, final MessageQueueSelector selector, final Object arg,
    final SendCallback sendCallback) throws MQClientException, RemotingException,
    InterruptedException;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    private SendResult sendSelectImpl(
    Message msg,
    MessageQueueSelector selector,
    Object arg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);

    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
    MessageQueue mq = null;
    try {
    List<MessageQueue> messageQueueList =
    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
    Message userMessage = MessageAccessor.cloneMessage(msg);
    String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
    userMessage.setTopic(userTopic);

    mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
    } catch (Throwable e) {
    throw new MQClientException("select message queue threw exception.", e);
    }

    long costTime = System.currentTimeMillis() - beginStartTime;
    if (timeout < costTime) {
    throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
    }
    if (mq != null) {
    return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
    } else {
    throw new MQClientException("select message queue return null.", null);
    }
    }

    validateNameServerSetting();
    throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    }
    有一个注意点:这种基于selector方式的,发送消息失败是不会重试的
    原因:我的理解是我们自己自定义的selector,在没有规则策略的前提下,大概率还是选择到这个失败的队列里面。但是如果有规避策略的话,又和我们定义selector的本意违背了(例如我要实现顺序消息,结果失败重试把它丢到了其他队列,违背了顺序消息的本意)
  • Post title:RocketMQ基础篇Producer发送消息
  • Post author:大黄
  • Create time:2022-08-05 10:02:53
  • Post link:https://huangbangjing.cn/2022/08/05/RocketMQ基础篇Producer发送消息/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.