RocketMQ基础篇Broker存储消息
大黄 Lv4

Broker是如何存储消息的

流程图

RocketMQ基础篇 Broker存储消息 流程图.png

代码解释

写入CommitLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;

StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

String topic = msg.getTopic();
int queueId = msg.getQueueId();

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery 延时等级 > 0,替换原有的消费主题为 SCHEDULE_TOPIC_XXXX,队列id 为 延时的等级-1
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}

InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}

PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;

// 申请 putMessageLock 锁 (将消息存储到CommitLog文件中是串行的)
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// 获取当前可以写入的CommitLog文件
// CommitLog 存储地址:${ROCKET_HOME}/store/commitlog 文件默认大小1g。一个文件写满后就再创建另一个文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;

// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);

// ${ROCKET_HOME}/store/commitlog 目录下没有任何文件
if (null == mappedFile || mappedFile.isFull()) {
// 以偏移量为0 创建commitLog文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}

result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}

elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 处理完追加逻辑就会释放锁
putMessageLock.unlock();
}

if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}

if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

// 刷盘
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 此处会处理主从同步的结果(HA)
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
}
}
return putMessageResult;
});
}

从代码中,我们可以看到写入CommitLog这个操作主要做了以下几件事情

  1. 设置存储的消息的基本信息
    1. 如果消息是延时消息,会将原有的消息topic替换为 SCHEDULE_TOPIC_XXXX,队列id为延迟等级
  2. 申请putMessageLock(该操作确保了只会有一个线程去对CommitLog进行修改)
  3. 如果CommitLog不存在或者已经写满,需要创建新的CommitLog
  4. 往MappedFile中追加消息(追加消息的时候才会生成消息唯一ID)
    1. 如果文件剩余空间不足,会创建新的文件 (消息长度 + END_FILE_MIN_BLANK_LENGTH > CommitLog文件空闲空间,返回END_OF_FILE)
    2. 其他的一些异常
  5. 追加完消息,释放锁

那么,这边来解释下CommitLog和MappedFile的关系

CommitLog和MappedFile的关系

CommitLog里面有什么

RocketMQ基础篇 Broker存储消息 CommitLog结构.png
CommitLog里面记录了消息的完整内容,我们在读取消息的时候,首先先通过前4个字节记录了当前消息的实际长度,然后再往后读对应的长度,就可以将消息完全读取出来
CommitLog文件在磁盘中的存储路径 ${ROCKET_HOME}/store/commitlog/

CommitLog和MappedFile的关系

RocketMQ采用内存映射文件的方式来提高IO访问性能,无论是CommitLog,ConsumeQueue还是IndexFile,单个文件都被设置为固定长度。然后RocketMQ使用MappedFile和MappedFileQueue来封装存储文件。具体对MappedFile和MappedFileQueue的概念我会在后面在具体介绍

CommitLog与ConsumeQueue,IndexFile的关系

既然要探讨ConsumeQueue和IndexFile与CommitLog之间的关系,不如直接来讲一讲这两个文件是干什么用的,从他们的用处中我们也就能够得知为什么在已经有了CommitLog的情况下,RocketMQ还需要ConsumeQueue和IndexFile这两种文件

ConsumeQueue

首先,我们来看看ConsumeQueue的结构

ConsumeQueue的结构

RocketMQ基础篇 Broker存储消息 ConsumeQueue目录结构.png
RocketMQ基础篇 Broker存储消息 ConsumeQueue结构.png
从图片中,我们不难看出,RocketMQ对ConsumeQueue的层级目录为
ConsumeQueue - 具体的Topic - 具体的某一个队列 - ConsumeQueue的单元 (记录了消息在CommitLog中的偏移量,消息的长度,消息的tags)

为什么需要CommitLog和ConsumeQueue的映射关系

那么为什么RocketMQ要设计这样的映射关系呢?主要要从两个方面来考虑,同时这两个方面最终的目的也是为了保证RocketMQ的性能

  • 写入消息:如果我们没有CommitLog,直接就用一个ConsumeQueue,那么在Producer生产消息给Broker的时候,不同的Topic和队列的消息我们要写到不同的ConsumeQueue文件中。那么就会存在随机写的问题,这样写入消息的效率就会变的非常的低
  • 读取消息:如果为了保证消息的写入效率,我们将消息存放在CommitLog中,但是,在消费者拉取消息的时候,它肯定是要拉取自己感兴趣的Topic的消息。此时,我们去CommitLog中寻找对应的Topic和对应MessageQueue下对应偏移量的消息,挨个去遍历CommitLog一看就是一个效率非常低的操作。此时我们使用ConsumeQueue就可以快速定位到消息处于哪个CommitLog,他对应的偏移量是多少。这样读取消息的效率就会变得非常的高

    ConsumeQueue的tag hashcode

    这边补充一点,关于ConsumeQueue中的tag hashcode。
    在RocketMQ中,我们不仅可以监听我们感兴趣的Topic消息,同时我们还可以只监听Topic下部分的消息。这个操作就是通过消息的tag来实现的(一种实现方式,另外还有类过滤和SQL92过滤的方式)【RocketMQ服务端过滤根据tag的hashcode】
    在消费者拉取消息的时候,不仅消费者自己会对消息进行过滤,Broker也会根据消费者的情况对消息进行一次预过滤。此处我们就不展开讲述,具体可以消息消费 Consumer消费消息 的文章

IndexFile

IndexFile的结构

RocketMQ基础篇 Broker存储消息 IndexFile结构.png
他的目的是为了方便我们根据消息的key快速检索消息,快速检索主要是依靠文件头部的Hash槽。

哈希冲突的解决

既然说到了hash,那么就一定会存在哈希冲突的情况。那么,IndexFile是如何解决哈希冲突的呢?
在我们写IndexFile的时候,假如IndexFile还有足够的空间,那么我们就会对当前这条消息的消息key进行一次哈希计算,假如对应的hash槽中没有记录对应的index条目偏移量,那么直接将偏移量记录到hash槽中。否则这是在Index条目中的pre index no记录原先的偏移量,然后再将这个条目的偏移量记录到hash槽中

对于为什么要设计CommitLog,ConsumeQueue,IndexFile的总结

因为消费者消费消息的时候,是根据具体的Topic去进行消费的。所以为了加快消费者消费的速度,需要根据具体的Topic去存储消息(ConsumeQueue)。但是如果直接把消息存储在ConsumeQueue,会导致生产者发送的消息Topic不同,出现大量随机写的情况。所以为了提高消费者和生产者的速度。相同的,为了加快对消息key检索消息的速度,所以添加了IndexFile
RocketMQ首先先将生产者的消息写入到CommitLog中(顺序写),然后在通过一个异步线程将消息基本信息(偏移量等)写入到ConsumeQueue和IndexFile中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ReputMessageService extends ServiceThread {
// ...省略其他代码
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
// 每执行一次任务推送休息1毫秒,就继续尝试推送消息到消息消费队列和索引文件
Thread.sleep(1);
// 消息消费转发核心方法
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
// ...省略其他代码
}

// 最终调用到该方法
public void doDispatch(DispatchRequest req) {
// CommitLogDispatcher 有三个实现
// 其中就有我们用到的 CommitLogDispatcherBuildConsumeQueue 写ConsumeQueue
// CommitLogDispatcherBuildIndex 写IndexFile
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}

MappedFile和MappedFileQueue

MappedFile

为什么需要MappedFile

RocketMQ底层使用 mmap + write (零拷贝)的方式减少用户态和内存态的切换次数
具体内容参考 mmap解释 这篇文章
这里只简单介绍一下:
加入没有mmap,我们读取文件需要经历两次数据拷贝,一次是从磁盘拷贝到page cache中,一次从page cache拷贝到用户空间内存。
而有了mmap,就能够减少page cache到用户空间内存的拷贝mmap对page cache和用户空间虚拟地址进行了直接的映射操作虚拟地址就等同于操作page cache

MappedFileQueue

MappedFileQueue是MappedFile的管理容器,它是对存储目录的封装
比如说在CommitLog文件存储目录下有好多个CommitLog文件,对应在内存中也会有好多个MappedFile对象,而MappedFileQueue就是来管理这些MappedFile对象的

MappedFile刷盘操作

消息刷盘有两种:

  • 同步刷盘:只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
  • 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

RocketMQ定期删除已经消费的消息文件

另外,RocketMQ会定时将已经消费的消息从存储文件中删除,以CommitLog为例,第一个CommitLog文件的偏移量不一定 00000000000000000000

主干逻辑流程图

RocketMQ基础篇 Broker存储消息 定期删除CommitLog.png

对应源码和注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 默认每个文件过期时间为72小时,通过Broker配置文件中设置fileReservedTime来改变
// 如果非当前写文件在一定时间间隔内没有再次被更新,则会被认为是过期文件,可以被删除(RocketMQ不会关注这个文件上的消息是否被全部消费)
private void deleteExpiredFiles() {
int deleteCount = 0;
// 文件保留时间 72小时
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 删除物理文件的间隔(在一次清除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的时间间隔)
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 在清除过期文件时,如果该文件被其他线程所占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务
// 同时在第一次试图删除该文件时记录当前时间戳 destroyMapedFileIntervalForcibly 表示第一次拒绝删除之后能保留的最大时间
// 在此时间内,同样可以被拒绝删除,同时会将引用减少1000个,超过该时间间隔后,文件将被强制删除
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

// 指定删除文件的时间点
boolean timeup = this.isTimeToDelete();
// 磁盘空间满了
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

if (timeup || spacefull || manualDelete) {

if (manualDelete)
this.manualDeleteFileSeveralTimes--;

boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime,
timeup,
spacefull,
manualDeleteFileSeveralTimes,
cleanAtOnce);

fileReservedTime *= 60 * 60 * 1000;

// 清除过期文件
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
Object[] mfs = this.copyMappedFiles(0);

if (null == mfs)
return 0;

// 从倒数第二个文件开始
int mfsLength = mfs.length - 1;
int deleteCount = 0;
// 通过MappedFile,没有磁盘i/o的情况
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
// 文件最大存活时间=文件的最后一次更新时间+文件存活时间(默认72小时)
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 当前时间>=最大存活时间 或 需要立即删除
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
// 删除文件(将该文件加入到待删除文件列表中,然后统一执行File#delete方法将文件从物理磁盘中删除)
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;

if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}

if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}

deleteExpiredFile(files);

return deleteCount;
}
  • Post title:RocketMQ基础篇Broker存储消息
  • Post author:大黄
  • Create time:2022-08-05 10:10:16
  • Post link:https://huangbangjing.cn/2022/08/05/RocketMQ基础篇Broker存储消息/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.