Broker是如何存储消息的
流程图
代码解释 写入CommitLog 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 public CompletableFuture<PutMessageResult> asyncPutMessage (final MessageExtBrokerInner msg) { msg.setStoreTimestamp(System.currentTimeMillis()); msg.setBodyCRC(UtilAll.crc32(msg.getBody())); AppendMessageResult result = null ; StoreStatsService storeStatsService = this .defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { if (msg.getDelayTimeLevel() > 0 ) { if (msg.getDelayTimeLevel() > this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); if (bornSocketAddress.getAddress() instanceof Inet6Address) { msg.setBornHostV6Flag(); } InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); if (storeSocketAddress.getAddress() instanceof Inet6Address) { msg.setStoreHostAddressV6Flag(); } PutMessageThreadLocal putMessageThreadLocal = this .putMessageThreadLocal.get(); PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); if (encodeResult != null ) { return CompletableFuture.completedFuture(encodeResult); } msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer); PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg)); long elapsedTimeInLock = 0 ; MappedFile unlockMappedFile = null ; putMessageLock.lock(); try { MappedFile mappedFile = this .mappedFileQueue.getLastMappedFile(); long beginLockTimestamp = this .defaultMessageStore.getSystemClock().now(); this .beginTimeInLock = beginLockTimestamp; msg.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0 ; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null )); } result = mappedFile.appendMessage(msg, this .appendMessageCallback, putMessageContext); switch (result.getStatus()) { case PUT_OK: break ; case END_OF_FILE: unlockMappedFile = mappedFile; mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); if (null == mappedFile) { log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0 ; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } result = mappedFile.appendMessage(msg, this .appendMessageCallback, putMessageContext); break ; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0 ; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR: beginTimeInLock = 0 ; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); default : beginTimeInLock = 0 ; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); } elapsedTimeInLock = this .defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0 ; } finally { putMessageLock.unlock(); } if (elapsedTimeInLock > 500 ) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}" , elapsedTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this .defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this .defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1 ); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes()); CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg); return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) { log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}" , msg.getTopic(), msg.getTags(), msg.getBornHostNameString()); } } return putMessageResult; }); }
从代码中,我们可以看到写入CommitLog这个操作主要做了以下几件事情
设置存储的消息的基本信息
如果消息是延时消息,会将原有的消息topic替换为 SCHEDULE_TOPIC_XXXX,队列id为延迟等级
申请putMessageLock(该操作确保了只会有一个线程去对CommitLog进行修改)
如果CommitLog不存在或者已经写满,需要创建新的CommitLog
往MappedFile中追加消息(追加消息的时候才会生成消息唯一ID)
如果文件剩余空间不足,会创建新的文件 (消息长度 + END_FILE_MIN_BLANK_LENGTH > CommitLog文件空闲空间,返回END_OF_FILE)
其他的一些异常
追加完消息,释放锁
那么,这边来解释下CommitLog和MappedFile的关系
CommitLog和MappedFile的关系 CommitLog里面有什么 CommitLog里面记录了消息的完整内容 ,我们在读取消息的时候,首先先通过前4个字节记录了当前消息的实际长度,然后再往后读对应的长度,就可以将消息完全读取出来 CommitLog文件在磁盘中的存储路径 ${ROCKET_HOME}/store/commitlog/
CommitLog和MappedFile的关系 RocketMQ采用内存映射文件的方式来提高IO访问性能 ,无论是CommitLog,ConsumeQueue还是IndexFile,单个文件都被设置为固定长度。然后RocketMQ使用MappedFile和MappedFileQueue来封装存储文件 。具体对MappedFile和MappedFileQueue的概念我会在后面在具体介绍
CommitLog与ConsumeQueue,IndexFile的关系 既然要探讨ConsumeQueue和IndexFile与CommitLog之间的关系,不如直接来讲一讲这两个文件是干什么用的,从他们的用处中我们也就能够得知为什么在已经有了CommitLog的情况下,RocketMQ还需要ConsumeQueue和IndexFile这两种文件
ConsumeQueue 首先,我们来看看ConsumeQueue的结构
ConsumeQueue的结构 从图片中,我们不难看出,RocketMQ对ConsumeQueue的层级目录为 ConsumeQueue - 具体的Topic - 具体的某一个队列 - ConsumeQueue的单元 (记录了消息在CommitLog中的偏移量,消息的长度,消息的tags)
为什么需要CommitLog和ConsumeQueue的映射关系 那么为什么RocketMQ要设计这样的映射关系呢?主要要从两个方面来考虑,同时这两个方面最终的目的也是为了保证RocketMQ的性能
写入消息:如果我们没有CommitLog,直接就用一个ConsumeQueue,那么在Producer生产消息给Broker的时候,不同的Topic和队列的消息我们要写到不同的ConsumeQueue文件中。那么就会存在随机写的问题,这样写入消息的效率就会变的非常的低
读取消息:如果为了保证消息的写入效率,我们将消息存放在CommitLog中,但是,在消费者拉取消息的时候,它肯定是要拉取自己感兴趣的Topic的消息。此时,我们去CommitLog中寻找对应的Topic和对应MessageQueue下对应偏移量的消息,挨个去遍历CommitLog一看就是一个效率非常低的操作。此时我们使用ConsumeQueue就可以快速定位到消息处于哪个CommitLog,他对应的偏移量是多少。这样读取消息的效率就会变得非常的高ConsumeQueue的tag hashcode 这边补充一点,关于ConsumeQueue中的tag hashcode。 在RocketMQ中,我们不仅可以监听我们感兴趣的Topic消息,同时我们还可以只监听Topic下部分的消息。这个操作就是通过消息的tag来实现的(一种实现方式,另外还有类过滤和SQL92过滤的方式)【RocketMQ服务端过滤根据tag的hashcode】 在消费者拉取消息的时候,不仅消费者自己会对消息进行过滤,Broker也会根据消费者的情况对消息进行一次预过滤。此处我们就不展开讲述,具体可以消息消费 Consumer消费消息 的文章
IndexFile IndexFile的结构 他的目的是为了方便我们根据消息的key快速检索消息,快速检索主要是依靠文件头部的Hash槽。
哈希冲突的解决 既然说到了hash,那么就一定会存在哈希冲突的情况。那么,IndexFile是如何解决哈希冲突的呢? 在我们写IndexFile的时候,假如IndexFile还有足够的空间,那么我们就会对当前这条消息的消息key进行一次哈希计算,假如对应的hash槽中没有记录对应的index条目偏移量,那么直接将偏移量记录到hash槽中。否则这是在Index条目中的pre index no记录原先的偏移量,然后再将这个条目的偏移量记录到hash槽中
对于为什么要设计CommitLog,ConsumeQueue,IndexFile的总结 因为消费者消费消息的时候,是根据具体的Topic去进行消费的。所以为了加快消费者消费的速度,需要根据具体的Topic去存储消息(ConsumeQueue)。但是如果直接把消息存储在ConsumeQueue,会导致生产者发送的消息Topic不同,出现大量随机写的情况。所以为了提高消费者和生产者的速度。相同的,为了加快对消息key检索消息的速度,所以添加了IndexFile RocketMQ首先先将生产者的消息写入到CommitLog中(顺序写),然后在通过一个异步线程将消息基本信息(偏移量等)写入到ConsumeQueue和IndexFile中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class ReputMessageService extends ServiceThread { @Override public void run () { DefaultMessageStore.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { Thread.sleep(1 ); this .doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this .getServiceName() + " service has exception. " , e); } } DefaultMessageStore.log.info(this .getServiceName() + " service end" ); } } public void doDispatch (DispatchRequest req) { for (CommitLogDispatcher dispatcher : this .dispatcherList) { dispatcher.dispatch(req); } }
MappedFile和MappedFileQueue MappedFile 为什么需要MappedFile RocketMQ底层使用 mmap + write (零拷贝)的方式减少用户态和内存态的切换次数 具体内容参考 mmap解释 这篇文章 这里只简单介绍一下: 加入没有mmap,我们读取文件需要经历两次数据拷贝,一次是从磁盘拷贝到page cache中,一次从page cache拷贝到用户空间内存。 而有了mmap,就能够减少page cache到用户空间内存的拷贝 ,mmap对page cache和用户空间虚拟地址进行了直接的映射 ,操作虚拟地址就等同于操作page cache
MappedFileQueue MappedFileQueue是MappedFile的管理容器,它是对存储目录的封装 比如说在CommitLog文件存储目录下有好多个CommitLog文件,对应在内存中也会有好多个MappedFile对象,而MappedFileQueue就是来管理这些MappedFile对象的
MappedFile刷盘操作 消息刷盘有两种:
同步刷盘:只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
RocketMQ定期删除已经消费的消息文件 另外,RocketMQ会定时将已经消费的消息从存储文件中删除,以CommitLog为例,第一个CommitLog文件的偏移量不一定 00000000000000000000
主干逻辑流程图
对应源码和注释 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private void deleteExpiredFiles () { int deleteCount = 0 ; long fileReservedTime = DefaultMessageStore.this .getMessageStoreConfig().getFileReservedTime(); int deletePhysicFilesInterval = DefaultMessageStore.this .getMessageStoreConfig().getDeleteCommitLogFilesInterval(); int destroyMapedFileIntervalForcibly = DefaultMessageStore.this .getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); boolean timeup = this .isTimeToDelete(); boolean spacefull = this .isSpaceToDelete(); boolean manualDelete = this .manualDeleteFileSeveralTimes > 0 ; if (timeup || spacefull || manualDelete) { if (manualDelete) this .manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this .getMessageStoreConfig().isCleanFileForciblyEnable() && this .cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}" , fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000 ; deleteCount = DefaultMessageStore.this .commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0 ) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed." ); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public int deleteExpiredFileByTime (final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) { Object[] mfs = this .copyMappedFiles(0 ); if (null == mfs) return 0 ; int mfsLength = mfs.length - 1 ; int deleteCount = 0 ; List<MappedFile> files = new ArrayList<MappedFile>(); if (null != mfs) { for (int i = 0 ; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; if (files.size() >= DELETE_FILES_BATCH_MAX) { break ; } if (deleteFilesInterval > 0 && (i + 1 ) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break ; } } else { break ; } } } deleteExpiredFile(files); return deleteCount; }