Janker
2022/09/04阅读:49主题:全栈蓝
RocketMQ核心概念及架构
RocketMQ核心概念及架构
简介
Apache RocketMQ是一个分布式消息和流式平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可扩展性。
功能介绍
-
发布/订阅消息模型 -
金融级事务消息 -
各种跨语言客户端,如 Java
、C/C++、Python
、Go
-
可插拔传输协议,如 TCP
、SSL
、AIO
-
内置消息跟踪功能,还支持 opentracing
-
多功能大数据和流式生态系统集成 -
按时间或偏移量的消息追溯 -
同一队列中的可靠 FIFO
和严格有序消息传递 -
高效拉&推消费模型 -
单个队列中的百万级消息累积容量 -
多种消息传递协议,如 JMS
和OpenMessageing
-
灵活的分布式扩展部署体系结构 -
闪电式批量信息交换系统 -
各种消息过滤器机制,如 SQL
和Tag
-
用于隔离测试和云隔离群集的 Docker
映像 -
功能丰富的管理仪表板,用于配置、度量和监控 -
认证和授权
列举了RocketMQ
的这么多功能,其实总结就几个关键词,「发布/订阅消息模型」(貌似消息中间件都有的功能)、「事务消息」、「跨语言客户端」、「传输协议可插拔」、「消息跟踪」、「大数据」、「大容量」、「消息过滤」、「单队列有序消息」、等等各种丰富的功能,反正就是厉害,就是nb,跟手机发布会上的台词是一样一样的。
概念和特性
核心概念
Producer(生产者)
负责生产消息,生产者将业务应用程序系统生成的消息发送给代理(broker)。RocketMQ提供了多种发送模式:同步、异步和单向。
Producer Group(生产者组)
相同角色的生产者被分组在一起。代理(broker)可以联络同一生产者组的不同生产者实例,以提交或回滚事务,以防原始生产者在事务之后崩溃。
「注意」:考虑到提供的生产者在发送消息方面足够强大,每个生产者组只允许一个实例,以避免不必要的生产者实例初始化。
Consumer(消费者)
消费者从代理(broker)中提取消息并将其提供给应用程序。从用户应用的角度来看,提供了两种类型的消费者 :拉取式消费、推动式消费。
PullConsumer(拉取式消费)
Pull consumer主动从代理(broker)中提取消息。一旦拉取了一批消息,用户应用程序就会启动消费过程。
PushConsumer(推动式消费)
另一方面,Push consumer封装了消息拉取、使用进程和维护内部的其他工作,将回调接口留给最终用户实现,在消息到达时执行。
Consumer Group(消费者组)
与前面提到的生产者组类似,角色完全相同的消费者被分组在一起并命名为消费者组。
消费组是一个庞大的概念,在消息消费方面,实现负载平衡和容错的目标非常容易。
「注意」:消费者组的消费者实例必须具有完全相同的主题订阅。
Topic(主题)
主题是一个类别,生产者投递消息,消费者拉取消息。主题与生产者和消费者的关系非常松散。一个主题可能有零个、一个或多个生产者向其发送消息;相反,生产者可以发送不同主题的消息。在一个消费者的视角下,一个消费者组或多个消费者组可以订阅一个主题。同样,消费者组可以订阅一个或多个主题,只要该组的实例保持订阅一致。
Message(消息)
消息是要投递的信息。消息必须有一个主题,该主题可以解释为您的邮件地址。消息还可能具有可选Tag和额外的键值对。例如,您可以为消息设置一个业务密钥,并在代理(broker)服务器上查找该消息,以便在开发过程中诊断问题。
Message Queue(消息队列)
主题被划分为一个或多个子主题“消息队列”。
Tag(标签)
Tag
,换句话说就是子主题,为用户提供了额外的灵活性。使用Tag
,来自同一业务模块的具有不同目的的消息可能具有相同的主题和不同的Tag
。Tag
将有助于保持代码的整洁和一致性,Tag还可以方便RocketMQ
提供的查询系统。
Broker(代理服务器)
Broker
是RocketMQ
系统的主要组成部分。它接收生产者发送的消息,存储它们并准备处理消费者的请求。它还存储与消息相关的元数据,包括消费者组、消费进度偏移量和主题/队列信息。
Name Server (名称服务)
Name Server-名称服务器充当路由信息提供程序。生产者/消费者客户端查找主题以找到相应的代理列表。
Message Model(消息模型)
-
Clustering(集群模式)
集群模式消费,相同Consumer Group的每个Consumer实例平均分摊消息。
-
Broadcasting(广播模式)
广播模式消费,相同Consumer Group的每个Consumer实例都接收全量的消息。
Message Order(消息顺序)
当使用DefaultMQPushConsumer
时,您可以决定有序地或并发地使用消息。
-
顺序消息
有序消费消息意味着消息的消费顺序与生产者为每个消息队列发送消息的顺序相同。如果您正在处理全局顺序是必需的场景,请确保您使用的主题只有一个消息队列。
「注意」:如果指定“有序消费”,则消息消费的最大并发性是消费者组订阅的消息队列数,及单个消息队列里保证有序。
-
无序消息
当并发消费消息时,消息消费的最大并发性仅受为每个消费客户端指定的线程池的限制。
「注意」:在此模式下不再保证消息顺序。
特性
1 订阅与发布
消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
2 消息顺序
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
-
全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景 -
分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
3 消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
4 消息可靠性
RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
-
Broker非正常关闭 -
Broker异常Crash -
OS Crash -
机器掉电,但是能立即恢复供电情况 -
机器无法开机(可能是cpu、主板、内存等关键设备损坏) -
磁盘设备损坏
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。
5 至少一次
至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
6 回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
7 事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
8 定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
-
level == 0,消息为非延迟消息 -
1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s -
level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
9 消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
-
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。 -
由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
10 消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下方法可以设置消息重试策略:
-
retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。 -
retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。 -
retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
11 流量控制
生产者流控,因为broker
处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
生产者流控:
-
commitLog
文件被锁时间超过osPageCacheBusyTimeOutMills
时,参数默认为1000ms,返回流控。 -
如果开启 transientStorePoolEnable == true
,且broker
为异步刷盘的主机,且transientStorePool
中资源不足,拒绝当前send请求,返回流控。 -
broker
每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue
,默认200ms,拒绝当前send请求,返回流控。 -
broker
通过拒绝send
请求方式实现流量控制。
注意,生产者流控,不会尝试消息重投。
消费者流控:
-
消费者本地缓存消息数超过 pullThresholdForQueue
时,默认1000。 -
消费者本地缓存消息大小超过 pullThresholdSizeForQueue
时,默认100MB。 -
消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan
时,默认2000。
消费者流控的结果是降低拉取频率。
12 死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ
将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message
),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue
)。在RocketMQ
中,可以通过使用console
控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
数据模型
数据模型对应关系如下:

-
一个 Topic
下对应多个消息 -
一个 Topic
可以对应多个Group
,反过来同一个group
也客户包含多个Topic
-
一个 Topic
下可以有多个queue
-
一个 Queue
对应一个offset
-
由3、4可以得出一个 Topic
可以对应多个offset
架构
技术架构

RocketMQ
架构上主要分为四部分,如上图所示:
-
Producer
:消息发布的角色,支持分布式集群方式部署。Producer
通过MQ的负载均衡模块选择相应的Broker
集群队列进行消息投递,投递的过程支持快速失败并且低延迟。 -
Consumer
:消息消费的角色,支持分布式集群方式部署。支持以push
推,pull
拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。 -
NameServer:NameServer
是一个非常简单的Topic
路由注册中心,其角色类似Dubbo
中的zookeeper
,支持Broker
的动态注册与发现。主要包括两个功能:Broker
管理,NameServer
接受Broker
集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer
将保存关于Broker
集群的整个路由信息和用于客户端查询的队列信息。然后Producer
和Conumser
通过NameServer
就可以知道整个Broker
集群的路由信息,从而进行消息的投递和消费。NameServer
通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker
是向每一台NameServer
注册自己的路由信息,所以每一个NameServer
实例上面都保存一份完整的路由信息。当某个NameServer
因某种原因下线了,Broker
仍然可以向其它NameServer
同步其路由信息,Producer
,Consumer
仍然可以动态感知Broker
的路由的信息。 -
BrokerServer
:Broker
主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker
包含了以下几个重要子模块。-
Remoting Module
:整个Broker
的实体,负责处理来自clients
端的请求。 -
Client Manager
:负责管理客户端(Producer/Consumer
)和维护Consumer
的Topic
订阅信息 -
Store Service
:提供方便简单的API
接口处理消息存储到物理硬盘和查询功能。 -
HA Service
:高可用服务,提供Master Broker
和Slave Broker
之间的数据同步功能。 -
Index Service
:根据特定的Message key
对投递到Broker
的消息进行索引服务,以提供消息的快速查询。
-

部署架构

RocketMQ 网络部署特点
-
NameServer
是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。 -
Broker
部署相对复杂,Broker
分为Master
与Slave
,一个Master
可以对应多个Slave
,但是一个Slave
只能对应一个Master
,Master
与Slave
的对应关系通过指定相同的BrokerName
,不同的BrokerId
来定义,BrokerId
为0表示Master
,非0表示Slave
。Master
也可以部署多个。每个Broker
与NameServer
集群中的所有节点建立长连接,定时注册Topic
信息到所有NameServer
。 注意:当前RocketMQ版本在部署架构上支持一Master
多Slave
,但只有BrokerId
=1的从服务器才会参与消息的读负载。 -
Producer
与NameServer
集群中的其中一个节点(随机选择)建立长连接,定期从NameServer
获取Topic
路由信息,并向提供Topic
服务的Master
建立长连接,且定时向Master
发送心跳。Producer
完全无状态,可集群部署。 -
Consumer
与NameServer
集群中的其中一个节点(随机选择)建立长连接,定期从NameServer
获取Topic路由信息,并向提供Topic
服务的Master
、Slave
建立长连接,且定时向Master
、Slave
发送心跳。Consumer
既可以从Master
订阅消息,也可以从Slave订阅消息,消费者在向Master
拉取消息时,Master
服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master
还是Slave
拉取。
结合部署架构图,描述集群工作流程:
-
启动 NameServer
,NameServer
起来后监听端口,等待Broker
、Producer
、Consumer
连上来,相当于一个路由控制中心。 -
Broker启动,跟所有的 NameServer
保持长连接,定时发送心跳包。心跳包中包含当前Broker
信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer
集群中就有Topic
跟Broker
的映射关系。 -
收发消息前,先创建 Topic
,创建Topic
时需要指定该Topic
要存储在哪些Broker
上,也可以在发送消息时自动创建Topic
。 -
Producer
发送消息,启动时先跟NameServer
集群中的其中一台建立长连接,并从NameServer
中获取当前发送的Topic
存在哪些Broker
上,轮询从队列列表中选择一个队列,然后与队列所在的Broker
建立长连接从而向Broker
发消息。 -
Consumer
跟Producer
类似,跟其中一台NameServer
建立长连接,获取当前订阅Topic
存在哪些Broker
上,然后直接跟Broker
建立连接通道,开始消费消息。
总结
本篇介绍了RocketMQ
有哪些功能,以及在RocketMQ
中一些核心概念和特性,最后讲了一下关于RocketMQ
的技术架构和部署架构,从宏观上看了一下生产者、消费者、代理服务、名称服务所担任的角色,以及结合部署架构描述了一下工作流程,想必大家对RocketMQ
已经有大概的认识,下一篇我们将会带来各个模块的相关设计,及原理分析。
扫码关注、一键三连。

作者介绍