进击的Matrix

V1

2022/06/12阅读:16主题:自定义主题1

RocketMQ架构设计

文章目录

RocketMQ基本架构
RocketMQ基本架构

概述

Apache RocketMQ是一个分布式消息和流处理平台,具有低延迟,高性能和高可靠性,具有亿万级容量和灵活的可扩展性。它由四部分组成:名称服务器,代理服务器,生产者和消费者,它们中的每一个集群都可以水平扩展,而不会出现单点故障,如上图所示。

  • 名称服务器集群

名称服务器提供轻量级服务发现和路由。每个名称服务器记录完整的路由信息,提供相应的读写服务,支持快速的存储扩展。

  • 代理集群

代理关注的是消息存储,它通过提供轻量级主题(TOPIC)和队列(QUEUE)机制来处理消息存储。他们支持推,拉模型,包含容错机制(2个副本或3个副本),能够抵御强峰值,具备按序积压千亿条消息的的能力。此外,代理还提供容灾,丰富的度量统计数据和报警机制,这些都是传统消息系统所缺少的。

  • 生产者集群

生产者支持分布式部署,分布式生产者通过多种负载平衡模式向代理集群发送消息,发送进程支持快速故障和低延迟。

  • 消费者集群

消费者集群也支持推,拉模式的分布式部署。它还支持集群消费和消息广播。它提供了实时消息订阅机制,可以满足大多数消费者的需求,RocketMQ的网站为感兴趣的用户提供了一个非常简单的快速入门指南。

名称服务

名称服务器是一个功能齐全的服务,主要包含两个功能:

  • 代理管理:名称服务器从代理集群接收注册,并提供心跳机制来检查代理是否存活。
  • 路由管理:每个名称服务器将保存有关代理集群的整个路由信息和用于客户端查询的队列信息。

如我们所知,RocketMQ客户端(生产者/消费者)将从NameServer查询队列路由信息,但是客户端如何找到NameServer地址的呢? 有四种方式向客户端提供名称服务器地址列表,如下:

  • 编程方式: ext : producer.setNamesrvAddr("ip:port").
  • Java 配置: ext:  rocket.namesrv.addr.
  • 环境变量: ext: NAMESRV_ADDR.
  • HTTP端点.

关于更深入的介绍客户端如何找到NameServer地址的,请查看提供名称服务器地址列表的四种方法

代理服务

代理服务器负责消息存储和传递,消息查询,高可用保证等。 如下图所示, 代理服务器有以下几个重要的子模块:

Broker服务器服务功能
Broker服务器服务功能
  • 远程处理模块:代理的入口,处理来自客户端的请求。
  • 客户端管理模块:管理客户端(生产者/消费者)并维护消费者的主题订阅。
  • 存储服务模块:提供简单的API来存储或查询物理磁盘中的消息。
  • 高可用服务模块:在主从代理之间提供数据同步功能。
  • 索引服务:根据特定key,建立消息索引,并提供快速消息查询。

部署服务

本节介绍生产就绪,部署解决方案。一般来说,我们正在部署一个没有单点故障的弹性RocketMQ集群。

前提条件

在开始本节之前,请确保您已经阅读了快速上手部分,并且熟悉RocketMQ的核心概念和组件。

生产就绪部署

  • 名称服务器

为了确保集群在一个实例宕机时仍然能够正常工作,建议使用两个或多个名称服务器实例,只要有一个名称服务器实例处于存活状态,整个集群就保持服务状态。 名称服务器遵循无共享设计模式,代理服务器将心跳数据发送到所有名称服务器,生产者和消费者可以在发送/消费消息时从任何可用的名称服务器查询元数据。

  • 代理

代理可以根据其角色分为两类:主代理和从代理。主代理提供RW(读写)访问,而从代理只接收读访问。 要在没有单点故障的情况下部署高可用RockeMQ集群,应该部署一系列代理集群。一个代理集群含一个主代理和几个从代理,其中主代理brokerid设置为0,从代理brokerid设置为非0即可。一组代理集群中都代理有相同的代理名称(brokerName)。在极端情况下,在一个代理集中至少需要设置两个代理。每个主题驻留在两个或多个代理中。

配置

部署RocketMQ集群时,建议使用以下配置: Broker configuration

Property Name Default value Details
listenPort 10911 listen port for client
namesrvAddr null name server address
brokerIP1 InetAddress for network interface Should be configured if having multiple addresses
brokerName null broker name
brokerClusterName DefaultCluster this broker belongs to which cluster
brokerId 0 broker id, 0 means master, positive integers mean slave
storePathCommitLog $HOME/store/commitlog/ file path for commit log
storePathConsumerQueue $HOME/store/consumequeue/ file path for consume queue
mapedFileSizeCommitLog 1024 _ 1024 _ 1024(1G) mapped file size for commit log
deleteWhen 04 When to delete the commitlog which is out of the reserve time
fileReserverdTime 72 The number of hours to keep a commitlog before deleting it
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLVAE
flushDiskType ASYNC_FLUSH {SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance.

CLI管理工具

RocketMQ提供了一个CLI(命令行界面)管理工具,用于查询,管理和诊断各种问题。 如何获得 管理工具是随RocketMQ一起提供,你要么下载一个预构建的二进制版本,要么自己从源代码构建,这样你就拥有它了。 如果您需要源代码, RocketMQ工具模块包含其源代码。 如何使用 管理工具非常容易使用,这里出于演示的目的,假设为Linux的环境。在mq安装目录下的/bin目录中,使用bash命令: mqadmin, 就可以看到以下的帮助菜单:

The most commonly used mqadmin commands are:
updateTopic          Update or create topic
deleteTopic          Delete topic from broker and NameServer
updateSubGroup       Update or create subscription group
deleteSubGroup       Delete subscription group from broker
updateBrokerConfig   Update broker's config
updateTopicPerm      Update topic perm
topicRoute           Examine topic route info
topicStatus          Examine topic Status info
topicClusterList     get cluster info for topic
brokerStatus         Fetch broker runtime status data
queryMsgById         Query Message by Id
queryMsgByKey        Query Message by Key
queryMsgByUniqueKey  Query Message by Unique key
queryMsgByOffset     Query Message by offset
queryMsgByUniqueKey  Query Message by Unique key
printMsg             Print Message Detail
sendMsgStatus        Send msg to broker
brokerConsumeStats   Fetch broker consume stats data
producerConnection   Query producer's socket connection and client version
consumerConnection   Query consumer's socket connection, client version and subscription
consumerProgress     Query consumers's progress, speed
consumerStatus       Query consumer's internal data structure
cloneGroupOffset     Clone offset from other group
clusterList          List all of clusters
topicList            Fetch all topic list from name server
updateKvConfig       Create or update KV config
deleteKvConfig       Delete KV config
wipeWritePerm        Wipe write perm of broker in all name server
resetOffsetByTime    Reset consumer offset by timestamp(without client restart)
updateOrderConf      Create or update or delete order conf
cleanExpiredCQ       Clean expired ConsumeQueue on broker.
cleanUnusedTopic     Clean unused topic on broker
startMonitoring      Start Monitoring
statsAll             Topic and Consumer tps stats
syncDocs             Synchronize wiki and issue to github.com
allocateMQ           Allocate MQ
checkMsgSendRT       Check message send response time
   clusterRT            List All clusters Message Send RT

主从复制模式

为了确保不会丢失任何成功发布的消息,RocketMQ提供了一种复制模式,通过两种复制方式: 同步和异步,以获得更强的持久性和高可用性。

主从复制: 同步/异步代理 与许多复制系统一样,同步代理要等到提交日志被复制到从服务器后才能确认。相反,异步代理在主服务器上处理消息后立即返回。

如何配置 在conf文件夹下的rocketmq发行版附带了三个预构建的配置供您参考。

2m-2s-sync
2m-2s-async
2m-noslave

注意: 所有的配置使用异步刷新的方式.

部署 以2M-2S-SYNC的部署为例,首先,启动两个名称服务器,如快速启动部分所示: 假设他们的IP为192.168.0.2和192.168.0.3 开启代理(假设二进制rocketmq位于/home/rocketmq/dist)

>cd /home/rocketmq/dist/bin
>bash mqbroker -c ../conf/2m-2s-sync/broker-a.properties -n 192.168.0.2:9876,192.168.0.3:9876
>bash mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties -n 192.168.0.2:9876,192.168.0.3:9876
>bash mqbroker -c ../conf/2m-2s-sync/broker-b.properties -n 192.168.0.2:9876,192.168.0.3:9876
>bash mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties -n 192.168.0.2:9876,192.168.0.3:9876
How to verify
Execute the following command to verify according to the CLI section:
> bash mqadmin clusterlist

核心概念

RocketMQ基本模型
RocketMQ基本模型

了解了MQ的一些基本模型和概念之后,我们可以深入探讨消息传递系统设计的一些问题:

  • 消费者并发问题
  • 消费者热点问题
  • 消费者负载均衡问题
  • 消息路由
  • 连接多路复用
  • 灰度部署(Canary Deployments)

生产者

生产者将业务应用程序系统生成的消息发送给代理服务器,RocketMQ提供了多种发送模式: 同步,异步和单向传输。 生产组 相同角色的生产者被分组在一起。如果一台生产者实例在处理事务时宕机了,代理可以联系同一生产者组的不同生产者实例来提交或者回滚事务。

考虑到所提供的生产者在发送消息时足够强大,每个生产组只允许一实例,以避免不必要的生产者实例初始化。

消费者

消费者从代理服务器中拉取消并将消息输入应用程序。从用户应用的角度来看,提供了两种类型的消费者: 推送消费 punsh-consumer封装了消息拉取,消费进度和维护内部的其他工作,为最终用户留下一个回调接口,该接口将在消息到达时执行。 拉取消费 拉消费者积极从代理服务器中拉取消息,一旦一批消息被拉取出来,用户应用程序就会启动消费过程。 消费组 与前面提到的生产者组类似,具有完全相同角色的消费者被分组在一起,并命名为消费者组。 消费组是一个很好的概念,使得在消息消费方面,实现负载均衡和容错的目标非常容易。

注意:消费者组的消费实例必须具有完全相同的主题订阅.

主题

主题是生产者投递消息,消费者拉取消息的一个类别。主题的生产者,消费者的关系非常松散。具体来说,一个主题可以有0个,1个或者多个向其发送消息的生产者;相反,生产者可以发送不同主题的消息。从消费者角度来看,一个主题可以由0个,1个或多个消费者群体订阅。同样,只要消费组的实例保持订阅一致,用户组就可以订阅一个或多个主题。

消息

消息是要传递的信息。消息必须有一个主题,可以将其解释为要邮寄信件的地址。消息还可以具有可选的标记和额外的键值对。例如,您可以为消息设置业务ke,并在代理服务器上查找消息,以诊断开发过程中的问题。 消息队列 主题被划分为一个或多个子主题:"消息队列"。 标签 换句话说,标签子主题为使用者提供了额外的灵活性。对于标签,来自同一业务模块的具有不同目的的消息,可能具有相同的主题和不同标记。标签将有助于保持代码的整洁和一致,而且标签还可以帮助RocketMQ提供的查询系统。 代理 代理是RocketMQ系统的主要组成部分,它接收来自生产者的消息,存储它们,并准备处理来自消费者的拉取请求。它还存储与消息相关的元数据,包括消费组,消费进度偏移量和主题/队列信息。

名称服务器

名称服务器用作路由信息的提供者。生产者/消费者客户端查找主题以查找相应的Broker列表。

消息模型

  • 集群
  • 广播

消息顺序

当使用DefaultMQPushConsumer时,您可以决定是有序的或者是并发的消费消息。

  • 有序的

按顺序消费MQ意味着,MQ的消费顺序与生产者为每个消息队列发送的顺序相同,如果您处理全局顺序是必需的场景,请确保您使用的主题只有一个消息队列。

注意:如果指定了按顺序消费,则MQ消费的最大并发性是消费组订阅的消息队列数。

  • 并发的

当并发消费消息时,消费的最大并发性仅仅受每个消费者客户端指定的线程池的限制。

注意:在此模式下,不再保证消息的顺序

最后欢迎大家点赞、收藏、评论,转发!欢迎大家关注我的微信公众号!随机分享无用的计算机知识,微信搜索:进击的Matrix

分类:

后端

标签:

Java

作者介绍

进击的Matrix
V1

软件开发工程师