简介
Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件。相比于 Kafka,其拥有更好的实时性和消息可靠性。更适用于和 Money 相关的系统。它支持如下特性:
- 订阅/发布模式的消息
支持消费组模式的消费,即一个消费组集群内只有一个实例会收到那一条消息。
- 延时消息
只支持特定 Level 的延时设置,默认有 “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h” 18个 Level。先扔到对应的延时队列,后台线程根据延时再将其挪到实际的 Topic 中。
- 顺序消息
只保证在单个 Broker 的单个 Queue 内是有序的,全局不保证有序。和 Kafka 一样.。
- 消息持久化
[主从同步 + 同步刷盘] 模式保证了持久数据的安全性。4.5 版本后加入 Dleger 更是支持了 主从自动切换。
- 消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现,减少了r无用消息的网络传输。
- 消息回溯
已消费过的消息,可以根据时间或 key 等维度来重新消费。在处理系统环境异常时很有用。
- 事务消息
先发送到一个特殊的系统 Topic 中,然后利用 2PC + 事务回查机制,判断将消息转到真正的 Topic 还是抛弃。
- 死信队列
消费失败并重试一定次数还是失败的的消息会先放到死信队列,需要手动进行重发
- 消息重试
每个消费组有一个 “%RETRY%+consumerGroup” 的重试队列。重试的消息会按照延迟的时间先放到 “SCHEDULE_TOPIC_XXXX” 队列中,然后才会被保存至 “%RETRY%+consumerGroup” 的重试队列中。
- At-Least-Once
消息消费有 ACK 机制,消费结束才返回对应的 ACK 相应。和 Kafka 不太一样,Kafka 追求消息的大量快速处理,默认都是异步,整合成一批来消费生产的。
RoeketMQ 总的来说大致可以分为几个部分:NameServer, Broker Server, Client ( Producer and Consumer )。物理部署逻辑图如下:

NameServer:保存 Topic 路由信息,NameServer 实例之间不通信 Broker:Broker 有主从之分,Broker Name 相同的,BrokerId 为 0 的则为主服务器;每个 Broker 都需要向所有的 NameServer 注册,并周期性的向其发注册请求 Client(Consumer & Producer):周期性的去 NameServer 获取路由信息,周期性的向所有 Broker 发送心跳
NameServer
NameServer 有点类似于 Kafka 中 ZooKeeper 的作用,其充当一个路由注册中心,维护所有的 Broker 和 Topic 路由的信息。但是和 ZooKeeper 不同,集群内的 NameServer 之间是不通信的。 Broker 启动时需要向所有的 NameServer 实例注册,并定时向其发送心跳信息。因此,每个 NameServer 都是包含了所有的 Broker 和 Topic 的路由信息的,是一个完整的个体。
大致看下 NameServer 启动流程[基于代码版本 4.5.2]:
构建 NamesrvController –> NamesrvController 初始化 –> NamesrvController 启动
NamesrvController 初始化
1 | // NamesrvController 初始化 |
可以看出,其主要的行为就是:
- 启动对应的 RemotingServer 用于网络通信
- 注册其对应的 RequestProcessor 用于处理接收的特定请求
- 启动定时任务:扫描不活跃的 Broker 并移除之、打印对应的配置信息
RouteInfoManager 管理的数据:
1 | public class RouteInfoManager { |
NameServer 的 RequestProcessor
1 | public class NamesrvController { |
从上面可以看出,NameServer 主要处理如下几种类型的 Request,这也正是整个 NameServer 的作用:
- NameServer 的配置管理
- UPDATE_NAMESRV_CONFIG
- GET_NAMESRV_CONFIG
- kvConfig 的管理
- PUT_KV_CONFIG
- GET_KV_CONFIG
- DELETE_KV_CONFIG
- GET_KVLIST_BY_NAMESPACE
- Broker 的管理
- QUERY_DATA_VERSION
- REGISTER_BROKER
- UNREGISTER_BROKER
- GET_BROKER_CLUSTER_INFO
- WIPE_WRITE_PERM_OF_BROKER
- Topic 路由信息的管理
- GET_ROUTEINTO_BY_TOPIC
- GET_ALL_TOPIC_LIST_FROM_NAMESERVER
- DELETE_TOPIC_IN_NAMESRV
- GET_TOPICS_BY_CLUSTER
- GET_SYSTEM_TOPIC_LIST_FROM_NS
- GET_UNIT_TOPIC_LIST
- GET_HAS_UNIT_SUB_TOPIC_LIST
- GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST
NameServer 处理 REGISTER_BROKER 请求
Broker 向 NameServer 注册,类似于充当了一个心跳的作用。从 BrokerController 中可以看出,Broker 启动的时候就需要向所有的 NameServer 发送 REGISTER_BROKER 请求去注册自己的相关信息,并定默认时每 30s 再去注册一次。 结合 NameServer 的 scanNotActiveBroker 定时任务,NameServer 便可以维护一个较为实时的 Broker 信息列表。
1 | public class BrokerController { |
再看看 NameServer 如何处理该请求,大体流程如下:
根据请求解析出请求头和请求体
- 请求头信息包含:brokerName、brokerAddr、clusterName、haServerAddr、 brokerId、compressed
- 请求体包含:filterServerList、TopicConfigSerializeWrapper[ConcurrentMap<String, TopicConfig> topicConfigTable、DataVersion dataVersion]
根据请求头和请求体去更新 NameServer 本地维护的 clusterAddrTable、brokerAddrTable、brokerLiveTable、filterServerTable 等信息
看看其主要行为:
1 | public class RouteInfoManager { |
Client
RocketMQ 的 Client 从 NameServer 获取路由信息,并定时向所有的 Broker 发送心跳信息。
生产者
启动
生产者的话,我们主要从构造 DefaultMQProducerImpl
开始,随后就是启动该生产者了。构造函数主要是对 Executor 的一个初始化,暂且不看,我们直接看看其启动方法做的事情:
- 注册 Producer Group
- 调用
MQClientInstance
的 start 方法- 如果没有配置 NameServer 地址,首先获取 NameServerAddr
- 启动网络应答模块 request-response channel
- 启动系列周期定时任务
- 每两分钟获取 NameServerAddr
- 默认每 30s 从 NameServer 获取 Topic 路由信息
- 默认每 30s 向所有 Broker 发送心跳信息
- 默认每 5s 提交持久化消费 offset 记录
- 每分钟调整一次线程池
- 启动拉取消息的线程服务
- 启动 reblance 的线程服务
- 向所有 Broker 发送心跳信息 并上传对应的 Filter class 信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57public class DefaultMQProducerImpl implements MQProducerInner {
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// ... 省略部分代码(用于状态调整的,只需启动一次即可)
// 创建 or 获取 MQClientInstance 实例,一个客户端只有一个 MQClientInstance 实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册对应的消费 Group
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
// 启动 MQClientInstance
if (startFactory) {
mQClientFactory.start();
}
// ...
break;
}
// 向所有的 Broker 发送心跳信息,同时将 filter class 上传到 Filter Server
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
}
public class MQClientInstance {
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
}
Produce
对于生产消息,有几点需要注意 [代码较多就不贴了]
发送消息时如何选择 Broker 和 其中的 queue
大致就是根据 Topic 获取路由信息 -> 取随机数(后续则取得是之前的随机数+1)后取模获取 MessageQueue -> LatencyFaultTolerance 中判断其是否有效,有效则选择该 MessageQueue;否则的话,从 LatencyFaultTolerance 中选取一个 Broker;如果该 Broker 有的 writeQueueNums > 0, 则取模选其中的一个 queue;否则重新走一遍流程。
其中 LatencyFaultTolerance 的核心就是针对之前失败的请求,按照一定时间来做退避。即短时间内不再选取其作为发送目的地发送的 Topic 如果还未创建的话是如何处理的
如果 Topic 还未创建的话,这个时候从本地或是 NameServer 都是没办法根据该 Topic Name 获取到对应的路由信息的。 不过 RocketMQ 是可以支持自动创建 Topic 的(生产不建议打开就是了)。 其实际就是获取所有打开了
autoCreateTopic
配置的 Broker 的 默认 Topic: TBW102 (如果autoCreateTopic=true,该 Topic 会在 Broker 启动的时候自动被注册到 NameServer)的路由信息信息。但是这么做可能会导致消息的负载不均衡,我们以发送一个 UNKNOWN_TOPIC 为例说明下(开启了 autoCreateTopic):
- 从 NameServer 获取 UNKNOWN_TOPIC 路由信息,但获取不到
- 获取 TBW102 路由信息
- 选择 Broker & Queue 发送消息到对应的 Broker
- Broker 收到消息后做对应的存储持久化,并将该 UNKNOWN_TOPIC 注册到 NameServer(此时 NameServer 含有 UNKNOWN_TOPIC 的路由信息了,但是只有该 Broker的)
- 客户端后台定时任务从 NameServer 获取路由信息并更新本地的记录
假如在步骤5执行前,只有一条消息发送到了一个 Broker,那么此后岂不是该 Topic 的所有信息只能发送到这一个 Broker了,就失去了负载均衡的效果了。
- 发送的消息是事务消息时如何处理的

RT,流程如图所示。总的来说是有点类似于 2PC 的一个模式。但是加了个回查机制,这样可以处理网络请求的未知状态问题。 首次发送的消息属于 Half 消息,包含了消息的所有信息,但是其并不会直接发送到对应的 Topic 中去,而是会发送到名为 RMQ_SYS_TRANS_HALF_TOPIC 的系统 Topic 中。Broker 会有定时任务去检查该 Topic 中还未处理的的消息,然后去回查事务的状态,判断该事务是需要 commit (转移到真实的 Topic Queue 中)还是 rollback;当然回查不会是无线的,默认是回查 15 次没结果的话就会回滚。 那么如何才能知道该消息是否处理了? RocketMQ 的做法是额外引入了个 Op 消息,就是对于每个 Message,在 Commit or Rollback 后,都会有一条对于的 Op 信息,没有的话就说明该消息还未处理完成。 (为啥不使用拓展属性字段来表示?) 详细的可以查看 RocketMQ GitHub 上的设计文档:设计(design)
消费者
消费的启动整体和生产者类似。其中需要比较需要注意的是其内部的 Rebalance 的实现。这是一个后台定时任务,在启动 MQInstance 的时候就启动了。
Rebalance
我们这里主要看看针对集群消费模式的 Rebalnace。总的来说就是 RocketMQ 有个后台线程周期性的在执行 Rebanlance 的任务,默认为 20000ms。执行 Rebalance 实际是按照 Topic 来划分的,具体针对单个 Topic 进行 Rebalance 的流程大致如下:
- 根据 Topic 获取其当前对应的 MessageQueue 和 Consumer ID
- 根据配置的分配策略对数据进行分配处理,得出当前 Consumer 的消费 Mssageueue 列表。总共有如下几种分配策略
- AllocateMachineRoomNearby
- AllocateMessageQueueAveragely(默认)
- AllocateMessageQueueAveragelyByCircle
- AllocateMessageQueueByConfig
- AllocateMessageQueueByMachineRoom
- AllocateMessageQueueConsistentHash 策略讲解参考 Blog:RocketMQ-负载均衡
- 根据新的分配结果调整本地的消费分配数据 processQueueTable (
ConcurrentMap<MessageQueue, ProcessQueue>
) - 如果本地消费分配缓存数据有调整,则调整本地拉取消息的线程任务;删除失效的 MessageQueue 相关拉取任务,添加新的 MessageQueue 相关拉取任务
代码如下:
1 | public abstract class RebalanceImpl { |
Broker Server
整体设计架构
作为 RocketMQ 的核心部分,我们首先看看 Broker 其大体的一个架构。说有模块的都是基于其 Romoting Module 网络通信模块来实现的。

- Client Manager:管理客户端并维护 Consumer 的 Topic 订阅信息
- Store Service:负责消息存储服务
- HA Service: 负责主从 Broker 的数据同步
- Index Service:负责根据消息 Key 对其进行索引的服务
消息文件:
- CommitLog:消息数据的实际存储记录
- ConsumeQueue:消息消费队列,提高消息消费的性能。存储路径为:
$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
,保存了指定 Topic下 的队列消息在 CommitLog 中的起始物理偏移量,消息大小和消息 Tag 的 HashCode 值。 - IndexFile:消息索引文件,方便通过 key 或时间区间来查询消息。存储路径为:
$HOME\store\index${fileName}
其中 ConsumeQueue 和 IndexFile 是后台线程根据 CommitLog 异步生成的。

启动
- initialize
- 加载 topics.json、consumerOffset.json、subscriptionGroup.json、consumerFilter.json 文件恢复之前存储的相关数据
- 初始化 ThreadPoolExecutor
- 注册对应请求的处理器
- SendMessageProcessor: SEND_MESSAGE、SEND_MESSAGE_V2、SEND_BATCH_MESSAGE、CONSUMER_SEND_MSG_BACK
- PullMessageProcessor: PULL_MESSAGE
- QueryMessageProcessor: QUERY_MESSAGE、VIEW_MESSAGE_BY_ID
- ClientManageProcessor: HEART_BEAT、UNREGISTER_CLIENT、CHECK_CLIENT_CONFIG
- ConsumerManageProcessor: GET_CONSUMER_LIST_BY_GROUP、UPDATE_CONSUMER_OFFSET、QUERY_CONSUMER_OFFSET
- EndTransactionProcessor: END_TRANSACTION
- AdminBrokerProcessor: 命令较多,大多都是些和 Broker 相关的配置和状态获取调整的命令
- 启动一些打印 Broker 运行相关信息的后台周期性线程:consumerOffset 持久化的周期性线程( 默认 5s 一次)、consumerFilter 持久化的周期性线程( 10s 一次)、Broker 保护启动探测线程( 如果有开启的话[默认 false],消费者消费太慢会被禁止消费)
- 获取 NameServer 地址 和 SSL 、ACL、Deleger、Transaction 等的初始化
- start
- 启动各模块服务,messageStore、remotingServer、filterServerManager、brokerOuterAPI、brokerStatsManager等
- 启动周期性任务(默认 30s 一次;最小间隔 10s,最大间隔 60s ):向所有的 NameServer 注册该 Broker 的 Topic 等信息
处理请求
上面已经说了,每个请求都有对应的 Processor;这里我们就从 SEND_MESSAGE 的请求来看看其实如何处理的,这里我们只看最简单的单个消息发送处理的逻辑:
1 | public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { |
大体流程总结如下:
- 根据 RequestCode 判断是否是重试发送的消息以及消息的类型:批量发送、事务消息 or 单条消息
- 构造并校验消息数据:topic是否存在、是否与系统默认 topic 冲突、是否可写、broker 是否可写等
- 根据是否是事务消息执行
transactionalMessageService.prepareMessage
ormessageStore.pusMessage
messageStore.pusMessage
中校验消息数据大小是否超标、Broker 是否为主等后调用commitLog.putMessage(msg)
- commitLog 中最终将 msg append 到 MappedFile 中
commitLog.handleDiskFlush
根据FlushDiskType
来执行对应的 Flush 行为(取决于是同步刷盘还是异步刷盘)commitLog.handleHA
根据主从同步模式执行相关行为