Janker

V1

2022/09/04阅读:49主题:全栈蓝

RocketMQ核心概念及架构

RocketMQ核心概念及架构

简介

Apache RocketMQ是一个分布式消息和流式平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可扩展性。

功能介绍

  1. 发布/订阅消息模型
  2. 金融级事务消息
  3. 各种跨语言客户端,如Java、C/C++、PythonGo
  4. 可插拔传输协议,如TCPSSLAIO
  5. 内置消息跟踪功能,还支持opentracing
  6. 多功能大数据和流式生态系统集成
  7. 按时间或偏移量的消息追溯
  8. 同一队列中的可靠FIFO和严格有序消息传递
  9. 高效拉&推消费模型
  10. 单个队列中的百万级消息累积容量
  11. 多种消息传递协议,如JMSOpenMessageing
  12. 灵活的分布式扩展部署体系结构
  13. 闪电式批量信息交换系统
  14. 各种消息过滤器机制,如SQLTag
  15. 用于隔离测试和云隔离群集的Docker映像
  16. 功能丰富的管理仪表板,用于配置、度量和监控
  17. 认证和授权

列举了RocketMQ的这么多功能,其实总结就几个关键词,发布/订阅消息模型(貌似消息中间件都有的功能)、事务消息跨语言客户端传输协议可插拔消息跟踪大数据大容量消息过滤单队列有序消息、等等各种丰富的功能,反正就是厉害,就是nb,跟手机发布会上的台词是一样一样的。

概念和特性

核心概念

Producer(生产者)

负责生产消息,生产者将业务应用程序系统生成的消息发送给代理(broker)。RocketMQ提供了多种发送模式:同步、异步和单向。

Producer Group(生产者组)

相同角色的生产者被分组在一起。代理(broker)可以联络同一生产者组的不同生产者实例,以提交或回滚事务,以防原始生产者在事务之后崩溃。

注意:考虑到提供的生产者在发送消息方面足够强大,每个生产者组只允许一个实例,以避免不必要的生产者实例初始化。

Consumer(消费者)

消费者从代理(broker)中提取消息并将其提供给应用程序。从用户应用的角度来看,提供了两种类型的消费者 :拉取式消费、推动式消费。

PullConsumer(拉取式消费)

Pull consumer主动从代理(broker)中提取消息。一旦拉取了一批消息,用户应用程序就会启动消费过程。

PushConsumer(推动式消费)

另一方面,Push consumer封装了消息拉取、使用进程和维护内部的其他工作,将回调接口留给最终用户实现,在消息到达时执行。

Consumer Group(消费者组)

与前面提到的生产者组类似,角色完全相同的消费者被分组在一起并命名为消费者组。

消费组是一个庞大的概念,在消息消费方面,实现负载平衡和容错的目标非常容易。

注意:消费者组的消费者实例必须具有完全相同的主题订阅。

Topic(主题)

主题是一个类别,生产者投递消息,消费者拉取消息。主题与生产者和消费者的关系非常松散。一个主题可能有零个、一个或多个生产者向其发送消息;相反,生产者可以发送不同主题的消息。在一个消费者的视角下,一个消费者组或多个消费者组可以订阅一个主题。同样,消费者组可以订阅一个或多个主题,只要该组的实例保持订阅一致。

Message(消息)

消息是要投递的信息。消息必须有一个主题,该主题可以解释为您的邮件地址。消息还可能具有可选Tag和额外的键值对。例如,您可以为消息设置一个业务密钥,并在代理(broker)服务器上查找该消息,以便在开发过程中诊断问题。

Message Queue(消息队列)

主题被划分为一个或多个子主题“消息队列”。

Tag(标签)

Tag,换句话说就是子主题,为用户提供了额外的灵活性。使用Tag,来自同一业务模块的具有不同目的的消息可能具有相同的主题和不同的TagTag将有助于保持代码的整洁和一致性,Tag还可以方便RocketMQ提供的查询系统。

Broker(代理服务器)

BrokerRocketMQ系统的主要组成部分。它接收生产者发送的消息,存储它们并准备处理消费者的请求。它还存储与消息相关的元数据,包括消费者组、消费进度偏移量和主题/队列信息。

Name Server (名称服务)

Name Server-名称服务器充当路由信息提供程序。生产者/消费者客户端查找主题以找到相应的代理列表。

Message Model(消息模型)

  • Clustering(集群模式)

    集群模式消费,相同Consumer Group的每个Consumer实例平均分摊消息。

  • Broadcasting(广播模式)

    广播模式消费,相同Consumer Group的每个Consumer实例都接收全量的消息。

Message Order(消息顺序)

当使用DefaultMQPushConsumer时,您可以决定有序地或并发地使用消息。

  • 顺序消息

    有序消费消息意味着消息的消费顺序与生产者为每个消息队列发送消息的顺序相同。如果您正在处理全局顺序是必需的场景,请确保您使用的主题只有一个消息队列。

    注意:如果指定“有序消费”,则消息消费的最大并发性是消费者组订阅的消息队列数,及单个消息队列里保证有序。

  • 无序消息

    当并发消费消息时,消息消费的最大并发性仅受为每个消费客户端指定的线程池的限制。

    注意:在此模式下不再保证消息顺序。

特性

1 订阅与发布

消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。

2 消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

3 消息过滤

RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。

4 消息可靠性

RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:

  1. Broker非正常关闭
  2. Broker异常Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况
  5. 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。

5 至少一次

至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。

6 回溯消费

回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。

7 事务消息

RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

8 定时消息

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

9 消息重试

Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
  • 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。

10 消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

11 流量控制

生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控:

  • commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。
  • 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
  • broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
  • broker通过拒绝send 请求方式实现流量控制。

注意,生产者流控,不会尝试消息重投。

消费者流控:

  • 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
  • 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
  • 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。

消费者流控的结果是降低拉取频率。

12 死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

数据模型

数据模型对应关系如下:

  1. 一个Topic下对应多个消息
  2. 一个Topic可以对应多个Group,反过来同一个group也客户包含多个Topic
  3. 一个Topic下可以有多个queue
  4. 一个Queue对应一个offset
  5. 由3、4可以得出一个Topic可以对应多个offset

架构

技术架构

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后ProducerConumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
  • BrokerServerBroker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
    1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护ConsumerTopic订阅信息
    3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供Master BrokerSlave Broker之间的数据同步功能。
    5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

部署架构

RocketMQ 网络部署特点

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为MasterSlave,一个Master可以对应多个Slave,但是一个Slave只能对应一个MasterMasterSlave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示SlaveMaster也可以部署多个。每个BrokerNameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一MasterSlave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • ProducerNameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • ConsumerNameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的MasterSlave建立长连接,且定时向MasterSlave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServerNameServer起来后监听端口,等待BrokerProducerConsumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有TopicBroker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • ConsumerProducer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

总结

本篇介绍了RocketMQ有哪些功能,以及在RocketMQ中一些核心概念和特性,最后讲了一下关于RocketMQ的技术架构和部署架构,从宏观上看了一下生产者、消费者、代理服务、名称服务所担任的角色,以及结合部署架构描述了一下工作流程,想必大家对RocketMQ已经有大概的认识,下一篇我们将会带来各个模块的相关设计,及原理分析。

扫码关注、一键三连。

分类:

后端

标签:

后端

作者介绍

Janker
V1