type
status
date
slug
summary
tags
category
icon
password
RocketMQ存储的文件主要包括Comitlog 文件、 ConsumeQueue 文件、 IndexFile 文件。
- CommitLog :消息存储文件,所有消息主题的消息都存储在 CommitLog 文件中 。
- ConsumeQueue :消息消费队列,消息到达 CommitLog 文件后,将异步转发到消息消费队列,供消息消费者消费 。
- IndexFile :消息索引文件,主要存储消息 Key 与 Offset 的对应关系 。以下是核心类DefaultMessageStore的成员变量:
// 以下我们先不考虑事务消息和延迟消息的存储
该类的核心方法是putMessage(……),即消息存储的具体操作,它会先经过一系列判断,再进行下一步操作,即this.commitLog.putMessage(msg)。在这个方法中又会经历一系列判断,调用到result = mappedFile.appendMessage(msg, this.appendMessageCallback)。然后会继续判断,这个消息是单条消息还是批量消息,有点绕,那么最终写的关键操作在哪呢?。在这:
大家可自己去搜到这个方法,隐藏在一个内部类里的。在这个方法里,做了几件事:
1.判断消息长度是否越界
- 检查当前commitLog的空余大小是否足够(不够则新建一个,获取上一个文件尾部偏移量,这里是一个递归调用)
- 足够则顺序写入,然后更新偏移量,返回写入OK。
- 还没完,接着看。
值得一提的是,这里面使用了原生nio的相关方法,按照ByteBuff一块一块地读取写入。
那么这里就扯到了一个关键问题,都说RocketMQ用了mmap零拷贝技术,可以让文件读写速度加快,在哪里提现呢?这里就要说到一个关键的类MappedFile(上面提到过)。
Broker会针对磁盘上的各种CommitLog、ConsumeQueue文件预先分配好MappedFile,也就是提前对一些可能接下来要读写的磁盘文件,提前使用MappedByteBuffer执行map()函数完成映射,这样后续读写文件的时候,就可以直接执行了。
MappedFile 往上一级是MappedFileQueu,它是一个管理容器, MappedFileQueue是对存储目录的封装,例如 CommitLog 文件的存储路${ROCKET_HOME}/store/commitlog/ ,该目录下会存在多个内存映射文件(MappedFile)。 通过这个类,可以根据时间查找对应的MappedFile,从而找到对应commitLog。
我们详细说一下MappedFile,它是RocketMQ内存映射文件,其中有几个成员变量很重要,就是指针,即哪几个以Posisition结尾的变量(不贴图了),通过几个指针灵活变动,实现高效顺序写入和读取。其中有个flush方法,commit线程会定时将内存中的数据通过mmap的方式,刷到磁盘中。