进击的Matrix

V1

2022/06/25阅读:48主题:自定义主题1

RocketMQ的一些最佳实践

文章目录

将消息中间件RocketMQ服务/业务跑起来了,但是并不意味着跑得好。因为如何最佳的使用消息中间件RocketMQ,发挥RocketMQ架构设计优势,最大化使用服务器性能,无论是对中间件服务的部署,维护还是对中间件的使用者(生产者,消费),都是一个不断追求和探索的过程。

当单服务的流量或者业务的流量达到一定量级时,如何提高RocketMQ服务吞吐量?服务的稳定性?如何提高生产者,消费者性能?这在业务流量大的公司里面需要面临的问题,如何得出最佳的RocketMQ实践。其实又需要根据每个公司业务特点,使用场景,具体问题具体分析,不停摸索可能才能找到基于业务和使用场景的最佳实践。

当然RocketMQ也给出了一些通用性的最佳实践建议,希望可以给RocketMQ开发者一些建议和思路,下面介绍RocketMQ给出的一些最佳实践。

Broker的最佳实践

Broker角色选择

代理角色有:

  • 同步主节点(SYNC_MASTER)
  • 异步主节点(ASYNC_MASTER)
  • 从节点(SLAVE)

如果您不能容忍消息丢失,我们建议你部署同步主节点并在其上附加从节点。如果您对丢失消息可以容忍,但希望Broker始终可用,则可以将异步主节点和从节点一起部署,如果您只想简单一点,您可以只需要一个没有从节点的异步主节点即可。

磁盘刷新类型选择

建议使用异步刷新,因为同步刷新是非常昂贵的,会造成太多的性能损失。如果你想要可靠性,我们建议您使用同步主节点和从节点。

生产者的最佳实践

发送状态

当发送一条消息,你将会得到一个发送结果,里面包含发送状态,首先我们假设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到OK, 以下是每个状态描述列表:

FLUSH_DISK_TIMEOUT:刷新磁盘超时时间

如果Broker设置了MessageStoreConfigFlushDiskType=SYNC_FLUSH(默认是ASYNC_FLUSH),并且代理未在Messagestoreconfigsyncflushtimeout(默认为5秒)超时时间内完成磁盘刷新,则将获得此状态。

FLUSH_SLAVE_TIMEOUT:刷新从节点超时时间

如果Broker的角色是同步主机(默认异步主机),如果从节点没有在MessageStoreConfigsyncFlushTimeout(默认为5秒)时间内异步刷新,则将获得此状态。

SLAVE_NOT_AVAILABLE:从节点不可用

如果Broker的角色是同步主代理(默认是异步主代理),但是没有配置从属代理,则将会获得这个状态。

SEND_OK:发送成功

发送"确定"并不意味着它是可靠的,为确保不会丢失任何消息,Borker代理的角色应该启用同步主节点或同步刷新。

Duplication or Missing: 重复或丢失

如果您得到了FLUSH_DISK_TIMEOUTFLUSH_SLAVE_TIMEOUT,并且Borker在此时正好关闭,那么您发现您的消息丢失了。

这个时候你有两种选择,一个是代码上忽略,这可能导致这个消息丢失,另一个是代码重新发送消息,这个可能会导致消息重复。通常,我们建议重新发送,并在消费时找到处理重复删除的方法,除非你觉得当一些信息丢失时没有关系。但是请记住,当得到SLAVE_NOT_AVAILABLE时,重新发送消息是无用的,如果发生这种情况,您应该保留好消息信息并警告集群管理器。

超时设置

客户端向Broker集群发送请求,并等待响应,但是如果最大等待时间已经过去,并且没有返回响应,则客户端将抛出一个RemotingTimeoutException。默认等待时间是3秒。您可以通过使用send(msg,timeout)方法来设置超时时间。注意,我们不建议等待时间设置得太短,因为代理需要一些时间来刷新磁盘或与从属服务器同步。

另外,如果该值超过了syncflushtimeout太多,则效果可能很小,因为代理可能会在超时之前返回FLUSH_SLAVE_TIMEFLUSH_SLAVE_TIMEOU

消息体大小

我们建议消息体的大小不超过512K。

异步发送(Async Sending)

默认的send(msg)方法在接收到返回响应消息之前,将会被一直阻塞,因此,如果您关心性能,我们建议您使用send(msg,callback),生产者发送它将以异步的方式进行工作。

生产组(Producer Group)

通常,生产者组是没有影响的。但是如果您加入了一个事务,您就应该注意,默认情况下,在同一个JVM中只能创建一个具有相同生产组的生产者,这通常已经足够了。

线程安全

生产者是线程安全的,您可以在业务解决方案中使用它。

性能设置

如果你希望在一个JVM中有多个生产者进行大数据处理。我们建议:

  • 使用几个生产者异步发送(3-5个就足够了)
  • 为每个生产者设置实例名称

消费者的最佳实践

消费组和订阅

首先,你需要注意的是,不同的消费组可以独立的消费同一个主题,并且每个消费组都有自己的消费偏移量。请确保同一组中的每个消费者订阅相同的主题。

消息监听器

顺序消费:Orderly

顺序消费:消费者将锁定每个消息队列,以确保按顺序逐个消费它。这个将导致性能损失,但当你关心消息的顺序时,它是有用的。不建议抛出异常,你可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

并发消费:Concurrently

顾名思义,消费者将并发消费。建议使用它以获得良好的性能,不建议抛出异常,你可以使用ConsumeConcurrentlyStatus.RECONSUME_LATER来进行替代。

消费状态:Consume Status

MessageListenerConcurrently来说,你可以返回RECONSUME_LATER告诉消费者你现在不能马上消费它,想稍等一会儿再消费它,然后你可以继续消费其他消息,对MessageListenerOrderly来说,因为你关心的是顺序,你不能跳过某一条消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT(暂停当前队列)来告诉消费者等待一会儿。

Blocking

不建议使用阻塞监听器。因为它会阻塞线程池,最终可能会导致消费进程的停止。

消费者线程数设置

消费者使用ThreadPoolExecutor来处理内部消费,因此您可以通过setConsumeThreadMin或者setConsumeThreadMax来改变最大消费线程数和最大消费线程数。

从哪里开始消费

当一个新的消费组建立时,它将需要决定是否需要消费已经存在Broker代理中的历史消息。

  • CONSUME_FROM_LAST_OFFSET: 这个配置将会忽略历史消息,并消费之后产生的消息。

  • CONSUME_FROM_FIRST_OFFSET: 这个配置将会消费Broker代理中存在的所有消息。

  • CONSUME_FROM_TIMESTAMP: 这个配置将消费指定时间戳之后生成消息。

重复消费

许多情况可能导致重复消费,例如:

  • 生产者重复发送(例如: 在FLUSH_SLAVE_TIMEOUT的情况下)。
  • 消费者关闭,某些补偿机制未及时更新到Broker

因此,如果应用程序不能容忍重复消费,你可能需要做一些外部工作来处理这一问题,例如: 通过检查数据库的主键,来达到处理重复消费问题,或者消费幂等处理。

名称服务的最佳实践

Apache RocketMQ中,NameServer名称服务器被设计为协调分布式系统中的每个组件,协调主要通过管理主题路由信息来实现。

管理主要由两部分构成:

  • Borker定期更新保存在每个名称服务器中的元数据。
  • 名称服务器为客户端服务,包括生产者,消费者和命令行客户机提供最新的路由信息。

因此,在启动Broker代理和客户端之前,我们需要告诉他们如果通过名称服务器提供名称服务器地址列表来访问名称服务器。在Apache RocketMQ中可以通过四种方式实现。

1. 编程的方式配置

对于Broker代理,我们可以在Broker的配置文件中配置namesrvAddr=name-server-ip1:port;name-server-ip2:port参数。

对于生产者和消费者,我们按照如下方式向他们提供名称服务器地址列表:

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

如果使用shell中的admin命令行,也可以指定这种方式:

sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION

简单示例一下:sh mqadmin -n localhost:9876 clusterList 在名称服务器节点上查询集群信息。 如果您已经将管理工具集成到自己的看板中,则通过如下方式实现:

DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt("please_rename_unique_group_name");
defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

2. Java配置项

名称服务器地址列表可以通过在启动之前指定后继Java选项rocketmq.namesrv.addr来提供给应用程序。

3. 环境变量设置

您可以设置环境变量NAMESRV_ADDRBroker代理和客户端将会进行环境变量检查,如果设置了将会读取。

4. HTTP方式

如果没有使用前面提到的方法指定获取名称服务器的地址列表,Apache RocketMQ将每隔两分钟访问以下HTTP地址来获取和更新名称服务器列表,初始延迟为10秒。

默认HTTP地址为:

http://jmenv.tbsite.net:8080/rocketmq/nsaddr

您也可以通过rocketmq.namesrv.domain Java 环境变量来进行重写这个jmenv.tbsite.net默认地址,可以通过rocketmq.namesrv.domain.subgroup Java环境变量来重写nsaddr名称服务器地址。

如果您在生产环境中运行Apache RocketMQ,建议使用此方法,因为它为您提供了最大的灵活性--您可以动态的添加和删除名称服务器节点,而无需根据名称服务器的系统负载重新重启Broker和客户端。

如果对以上配置名称服务器方法进行优先级排序,那首先介绍的方法优于后面介绍的方法:

Programmatic Way > Java Options > Environment Variable > HTTP Endpoint

RocketMQ JVM/Linux配置

这是配置RocketMQ代理JVM/OS参数的简介。它指出了在部署RocketMQ集群之前应该考虑的特定配置。

JVM参数设置

建议使用最新发布的JDK1.8版本,带有服务器编译器和8G堆内存空间,设置相同的XmsXmx值,以防止JVM调整堆的大小以获得更好的性能。一个简单JVM配置参数如下:

-server -Xms8g -Xmx8g -Xmn4g

如果您不关系RocketMQ代理启动时间,那么请预先分配Java堆内存,来确保JVM初始化期间分配每个页,是一个更好的选择,如果您不关心启动时间,您就可以这么设置:

-XX:+AlwaysPreTouch

禁用有偏锁定可以减少JVM暂停:

-XX:-UseBiasedLocking

对于垃圾收集器,建议使用JDK 1.8G1收集器

-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30

这些GC参数看起来有点激进,但事实证明它在我们的生产环境中具有良好的性能。

不要将-XX:MaxGCPauseMillis值设置得太小,否则JVM将使用一个小的年轻代来实现这个目标,这将导致非常频繁的小GC

建议使用滚动GC日志文件:

-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m

如果写入GC文件会增加代理的延迟,请考虑将GC日志文件重新定向到内存文件系统:

-Xloggc:/dev/shm/mq_gc_%p.log

Linux内核参数设置

有一个操作系统脚本:os.sh,在该脚本文件目录bin中列出了许多内核参数,这些参数可以用于生产使用,只需要稍作更改,以下参数需要注意,更多详细信息请参阅/proc/sys/vm/* [1] 的文档。

  • vm.extra_free_kbytes

    告诉VM在后台回收(kswapd)开始的阈值和直接回收(通过分配进程)开始的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的高延迟。

  • vm.min_free_kbytes

    如果将其设置为低于1024KB,则系统将被微妙地破坏,并且在高负载下容易死锁。

  • vm.max_map_count

    限制进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLogConsumeQueue,因此建议为该参数设置更大的值。

  • vm.swappiness

    定义内核交换内存页的攻击性。较高的值将增加积极性,较低的值将减少互换量。建议为该值设置10以避免交换延迟。

  • File descriptor limits

    RocketMQ需要打开文件描述符(CommitLogConsumerQueue)和网络连接。建议将文件描述符设置为655350。

  • Disk scheduler

    RocketMQ建议使用截止日期I/O调度器,它试图为请求提供有保证的延迟,请参考文档[2]。

参考文档:

  1. Linux内核参数文档
  2. Red Hat I/O调度文档

最后欢迎大家点赞、收藏、评论,转发!

欢迎大家关注我的微信公众号!微信搜索:进击的Matrix

微信公众号-进击的Matrix
微信公众号-进击的Matrix

分类:

后端

标签:

后端

作者介绍

进击的Matrix
V1

软件开发工程师