俗世游子
2023/03/19阅读:8主题:极简黑
分布式流处理组件-优化篇:Producer生产调优之核心参数
💯 作者: 谢先生。 2014年入行的程序猿。多年开发和架构经验。专注于Java、云原生、大数据等技术。从CRUD入行,负责过亿级流量架构的设计和落地,解决了千万级数据治理问题。
📖 微信公众号、B站:搜索「谢先生说技术」不定时更新 ~
📂 清单: goku-framework、【定期开源】享阅读II
前言
发句牢骚:
最近真的累。年后第三天就去出差。3月回来马上又投入到新项目中~ ┓(;´_`)┏ 最近老危险了。项目都是别的组的~~
前面我们花了较长的时间对生产者Producer理论、Producer分区做了一个比较细致的介绍。详细大家在认真阅读完前两节的内容之后会对Kafka的生产者有一个比较清晰的认知。
其中我们需要重点掌握的内容是:Producer发送消息的过程,如果有不清楚的建议返回好好品味。
-
《分布式流处理组件-理论篇:Producer入门理论》
我们需要注意的是:
-
在生产环境由于物理机器等资源配置的影响,也为了更大程度上保证资源的利用率,我们都会对各个组件进行适配。
而Kafka的Producer也是一样的。在生产环境中也有很多需要注意的点。本章我们就来好好的聊一聊~
啰嗦两句Producer的消息发送原理

为了让大家清晰的理解优化点,我们简单过一下Producer的发送原理 已经明白的请略过~
如上图所示,消息数据通过主线程调用producer.send()
将其发送出去,其中经过拦截器、序列化器、分区器的层层加工之后,记录缓冲区RecordAccumulator
会将加工之后的消息记录添加到其中。而消息也不是单纯的存在于RecordAccumulator
中,为了降低网络IO,Producer将其按照batch的形式进行存放。
RecordAccumulator
当然不是无限大的,自然这里就是我们的第一个优化点而合理的对batch容量进行配置,就是我们所说的第二个优化点
消息的大小也是我们需要考虑的重点,姑且算是一个优化点
而batch消息也会根据分区器计算得到的分区号存在于对应的Deque
双端队列中,所以他们的关系就是图中一层包一层的样子
当batch满足某个条件或者消息等待指定时间之后,sender线程被拉起,Sender程序将不断从缓冲区取出数据,进而进入到另外一个阶段
新的优化点:指定等待时间
从缓冲区拉取出来的数据会被封装为Request
对象,并且与缓存区类似的是:
-
NetworkClient
中同样会存在一个类似缓冲区的存在:InFlightRequests
。其中会按照分区对Request进行存储。所以他们的逻辑关系其实是这样的

随后进行发送,而在默认情况下,如果Broker端一直没有响应,每个分区下的Request只能存放5个请求。而超出的情况将会阻塞发送逻辑。
消息发送成功后,将会清空原始数据。否则尝试重试等操作
Producer高吞吐
注意: 本段属于测试阶段,多图模式
前置条件
来吧,啰嗦完Producer发送过程之后,就到了精彩的测试验证环节。有句话需要重点说明下:
-
以下验证结果不属银弹,实际生产中参数配置如何: 还是需要结合实际业务场景和资源配置给出最佳参数
机器配置:
-
3台 2C4G 的虚拟机 -
Topic设置分区数为3,副本因子为2
测试脚本
本次Producer吞吐量测试脚本在Kafka中已经提供,我们在${KAFKA_HOME}/kafka_2.13-3.3.1/bin
中可以找到kafka-producer-perf-test.sh
。如下对执行参数进行简单说明:
-
--throughput: 限制测试发送消息的最大吞吐量,-1表示不受限制 -
--num-records: 测试消息的数据量。 -
--record-size: 每条消息的大小。 -
--producer-props: Producer可配置参数信息,我们也可以通过properties文件的方式配置到 --producer.config
默认参数负载情况
kafka-producer-perf-test.sh内部也是默认参数,如果机器配置不错可以适当调整JVM参数
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 --throughput -1 --num-records 100000 --record-size 1024
发送10W条消息大小为1KB的消息到newTopic_test001上,不设置吞吐限制
30421 records sent, 6083.0 records/sec (5.94 MB/sec), 2125.8 ms avg latency, 3695.0 ms max latency.
55005 records sent, 11001.0 records/sec (10.74 MB/sec), 3129.7 ms avg latency, 4864.0 ms max latency.
100000 records sent, 9042.408898 records/sec (8.83 MB/sec), 2719.28 ms avg latency, 4864.00 ms max latency, 2598 ms 50th, 4250 ms 95th, 4734 ms 99th, 4832 ms 99.9th.
输出表示:
-
成功消费100000记录,吞吐量:10101.010101条/s (9.86 MB/sec) -
平均延迟2451.60ms,最大延迟4025.00ms -
50%消息延迟2455ms, 95%消息延迟3581ms, 99%消息延迟3875ms, 99.9%消息延迟4016ms。
参数介绍
下面要介绍的这几个参数,在Producer端:任意一个都可以称为王牌的存在。接下来我们就一一来看看:
这些参数在
ProducerConfig
中都有介绍,我们一一来看~
batch.size
每当发送多个消息时,为了提高客户端和服务器的性能,生产者将尝试对多个消息进行打包成批,保证这一批可以在同一个分区内。默认为16384【16KB】
为了尽可能的提高吞吐量,在实际生产中需要对发送的消息进行合理预估,根据实际情况选择一个合理的大小,避免出现如下情况:
-
单条消息超过 batch.size
,Producer有可能不会处理此消息 -
batch.size
过大,有可能会造成Producer端内存空间的浪费 -
batch.size
过小,频繁的网络IO会降低Producer的吞吐
linger.ms
如果消息迟迟没有达到batch.size
,那么将尝试等待linger.ms
时间发送。默认等待时间为0,也就是当消息到达之后立即发送
但实际上为了减少发送的请求数量,在没有负载的情况下也会延迟5ms的时间。所以这也不是绝对的立即发送~
这两个参数我们或多或少都介绍过,但是并没有测试调整它们对吞吐量的影响。接下来我们就先来测试一波吧!!!
测试batch.size
和linger.ms
对吞吐量的影响
对照以上默认测试结果,然后我们开始进行参数调整。
跟着我的节奏,先对单个参数进行调整,对比差异
-
batch.size=32768
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=32768 --throughput -1 --num-records 100000 --record-size 1024
三次输出结果显示
68697 records sent, 13733.9 records/sec (13.41 MB/sec), 900.0 ms avg latency, 1812.0 ms max latency.
100000 records sent, 14817.009927 records/sec (14.47 MB/sec), 1129.92 ms avg latency, 1957.00 ms max latency, 1167 ms 50th, 1884 ms 95th, 1932 ms 99th, 1951 ms 99.9th.
100000 records sent, 22311.468095 records/sec (21.79 MB/sec), 1036.42 ms avg latency, 1467.00 ms max latency, 1100 ms 50th, 1431 ms 95th, 1449 ms 99th, 1462 ms 99.9th.
100000 records sent, 21753.317381 records/sec (21.24 MB/sec), 1001.94 ms avg latency, 1646.00 ms max latency, 955 ms 50th, 1614 ms 95th, 1631 ms 99th, 1639 ms 99.9th.
对比默认结果,我们可以看到在吞吐量上已经有了非常明显的提高
-
linger.ms=3000
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 --throughput -1 --num-records 100000 --record-size 1024
三次输出结果显示
60526 records sent, 12102.8 records/sec (11.82 MB/sec), 1632.5 ms avg latency, 2337.0 ms max latency.
100000 records sent, 12280.486307 records/sec (11.99 MB/sec), 1882.64 ms avg latency, 2591.00 ms max latency, 2050 ms 50th, 2553 ms 95th, 2571 ms 99th, 2584 ms 99.9th.
64966 records sent, 12990.6 records/sec (12.69 MB/sec), 1556.1 ms avg latency, 2757.0 ms max latency.
100000 records sent, 13540.961408 records/sec (13.22 MB/sec), 1719.67 ms avg latency, 2757.00 ms max latency, 1474 ms 50th, 2701 ms 95th, 2730 ms 99th, 2744 ms 99.9th.
71776 records sent, 14355.2 records/sec (14.02 MB/sec), 1506.2 ms avg latency, 2176.0 ms max latency.
100000 records sent, 14320.492625 records/sec (13.98 MB/sec), 1577.49 ms avg latency, 2176.00 ms max latency, 1668 ms 50th, 2134 ms 95th, 2154 ms 99th, 2170 ms 99.9th.
同样还是要和默认测试结果进行对比,虽然吞吐量没有配置batch.size
的效果差异,但也不能说本次调整不重要
-
合并参数测试
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 batch.size=32768 --throughput -1 --num-records 100000 --record-size 1024
三次输出结果显示
100000 records sent, 21739.130435 records/sec (21.23 MB/sec), 1007.46 ms avg latency, 1428.00 ms max latency, 995 ms 50th, 1400 ms 95th, 1411 ms 99th, 1424 ms 99.9th.
100000 records sent, 23041.474654 records/sec (22.50 MB/sec), 952.25 ms avg latency, 1334.00 ms max latency, 919 ms 50th, 1302 ms 95th, 1318 ms 99th, 1328 ms 99.9th.
100000 records sent, 24271.844660 records/sec (23.70 MB/sec), 916.17 ms avg latency, 1318.00 ms max latency, 912 ms 50th, 1278 ms 95th, 1309 ms 99th, 1314 ms 99.9th.
已经可以看出对比了吧。 我们继续~~~
compression.type
该参数对Producer生产的数据进行压缩,主要针对批数据压缩。默认是none【无压缩】,可以用来设置的值:
-
gzip -
snappy -
lz4 -
zstd
接下来我们直接测试这几个压缩算法的性能吧,为了方便写个脚本
#!/bin/bash
for i in gzip snappy lz4 zstd
do
for ((j=0; j<3; j++))
do
echo "----$i方式的第$j次测试----"
kafka-producer-perf-test.sh --topic newTopic_test002 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=65536 compression.type=$i --throughput -1 --num-records 100000 --record-size 1024
done
done
查看每个级别的三次输出
----gzip方式的第0次测试----
100000 records sent, 25425.883549 records/sec (24.83 MB/sec), 136.45 ms avg latency, 744.00 ms max latency, 97 ms 50th, 451 ms 95th, 677 ms 99th, 742 ms 99.9th.
----gzip方式的第1次测试----
100000 records sent, 27019.724399 records/sec (26.39 MB/sec), 54.22 ms avg latency, 291.00 ms max latency, 27 ms 50th, 174 ms 95th, 228 ms 99th, 288 ms 99.9th.
----gzip方式的第2次测试----
100000 records sent, 27940.765577 records/sec (27.29 MB/sec), 21.55 ms avg latency, 267.00 ms max latency, 11 ms 50th, 75 ms 95th, 97 ms 99th, 130 ms 99.9th.
===========分割线=============
----snappy方式的第0次测试----
100000 records sent, 34153.005464 records/sec (33.35 MB/sec), 518.28 ms avg latency, 1047.00 ms max latency, 479 ms 50th, 969 ms 95th, 1029 ms 99th, 1044 ms 99.9th.
----snappy方式的第1次测试----
100000 records sent, 32765.399738 records/sec (32.00 MB/sec), 474.20 ms avg latency, 985.00 ms max latency, 476 ms 50th, 921 ms 95th, 971 ms 99th, 984 ms 99.9th.
----snappy方式的第2次测试----
100000 records sent, 34578.146611 records/sec (33.77 MB/sec), 450.48 ms avg latency, 932.00 ms max latency, 442 ms 50th, 869 ms 95th, 911 ms 99th, 928 ms 99.9th.
===========分割线=============
----lz4方式的第0次测试----
100000 records sent, 34059.945504 records/sec (33.26 MB/sec), 474.75 ms avg latency, 871.00 ms max latency, 461 ms 50th, 818 ms 95th, 852 ms 99th, 867 ms 99.9th.
----lz4方式的第1次测试----
100000 records sent, 37174.721190 records/sec (36.30 MB/sec), 408.28 ms avg latency, 769.00 ms max latency, 418 ms 50th, 716 ms 95th, 753 ms 99th, 767 ms 99.9th.
----lz4方式的第2次测试----
100000 records sent, 32404.406999 records/sec (31.64 MB/sec), 483.38 ms avg latency, 1145.00 ms max latency, 372 ms 50th, 1077 ms 95th, 1122 ms 99th, 1142 ms 99.9th.
===========分割线=============
----zstd方式的第0次测试----
100000 records sent, 51975.051975 records/sec (50.76 MB/sec), 57.53 ms avg latency, 279.00 ms max latency, 54 ms 50th, 108 ms 95th, 141 ms 99th, 158 ms 99.9th.
----zstd方式的第1次测试----
100000 records sent, 47755.491882 records/sec (46.64 MB/sec), 66.12 ms avg latency, 333.00 ms max latency, 49 ms 50th, 208 ms 95th, 316 ms 99th, 332 ms 99.9th.
----zstd方式的第2次测试----
100000 records sent, 51203.277010 records/sec (50.00 MB/sec), 60.27 ms avg latency, 265.00 ms max latency, 49 ms 50th, 148 ms 95th, 172 ms 99th, 193 ms 99.9th.
===========分割线=============
根据本次测试,而且测试命令也运行了多次,确实是zstd
较好。 而我们再看看官网给出的说明:
留个坑,下来我重新试试~~~
acks
前面章节中其实我们介绍过acks
这个参数。
我们再啰嗦一下acks的可设置值
-
"0": 当消息调用 send()
发送出去之后就表示消息已经发送成功,不管消息是否已经到达broker -
"1": 消息发送后,Leader接收到消息并记录到本地之后,不需要同步数据到副本就能进行ack返回 -
"all": 当消息在Leader接收记录,并且等待副本数据同步完成之后,才会返回ack。 该级别也属于 Java#Producer
的默认配置
接下来我们针对以上三种级别来做一个效果验证
基本就是默认配置,只不过多了一个acks
的配置项
#!/bin/bash
for i in 0 1 all
do
echo "---------------acks=$i------------------"
for ((j=0; j<3; j++))
do
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 acks=$i --throughput -1 --num-records 100000 --record-size 1024
done
done
查看每个级别的三次输出
有数据有真相
--------------acks=0------------------
100000 records sent, 43878.894252 records/sec (42.85 MB/sec), 60.90 ms avg latency, 367.00 ms max latency, 20 ms 50th, 218 ms 95th, 297 ms 99th, 363 ms 99.9th.
100000 records sent, 38699.690402 records/sec (37.79 MB/sec), 121.56 ms avg latency, 823.00 ms max latency, 60 ms 50th, 469 ms 95th, 736 ms 99th, 814 ms 99.9th.
100000 records sent, 37650.602410 records/sec (36.77 MB/sec), 127.71 ms avg latency, 441.00 ms max latency, 100 ms 50th, 351 ms 95th, 403 ms 99th, 440 ms 99.9th.
---------------acks=1------------------
100000 records sent, 30284.675954 records/sec (29.57 MB/sec), 387.87 ms avg latency, 897.00 ms max latency, 340 ms 50th, 799 ms 95th, 855 ms 99th, 893 ms 99.9th.
100000 records sent, 36995.930448 records/sec (36.13 MB/sec), 247.35 ms avg latency, 826.00 ms max latency, 199 ms 50th, 703 ms 95th, 789 ms 99th, 813 ms 99.9th.
100000 records sent, 37425.149701 records/sec (36.55 MB/sec), 394.01 ms avg latency, 850.00 ms max latency, 389 ms 50th, 784 ms 95th, 814 ms 99th, 841 ms 99.9th.
---------------acks=all------------------
76362 records sent, 15272.4 records/sec (14.91 MB/sec), 1422.4 ms avg latency, 1980.0 ms max latency.
100000 records sent, 16084.928422 records/sec (15.71 MB/sec), 1463.71 ms avg latency, 1980.00 ms max latency, 1628 ms 50th, 1903 ms 95th, 1960 ms 99th, 1974 ms 99.9th.
81151 records sent, 16223.7 records/sec (15.84 MB/sec), 1404.0 ms avg latency, 2201.0 ms max latency.
100000 records sent, 16906.170752 records/sec (16.51 MB/sec), 1397.82 ms avg latency, 2201.00 ms max latency, 1491 ms 50th, 2001 ms 95th, 2136 ms 99th, 2193 ms 99.9th.
88156 records sent, 17631.2 records/sec (17.22 MB/sec), 1245.0 ms avg latency, 1688.0 ms max latency.
100000 records sent, 17969.451932 records/sec (17.55 MB/sec), 1255.73 ms avg latency, 1688.00 ms max latency, 1403 ms 50th, 1598 ms 95th, 1635 ms 99th, 1668 ms 99.9th.
所以:
-
如果是类似日志、行为等不重要的消息,建议将 acks
设置为0. -
其他的就根据消息的安全程度来进行合理的选择吧~
下期预告
本期针对Producer调优参数的介绍和测试对比到这里就已经结束了。还是一句话:
-
从数据看结果:实际生产中进行测试,选择合理的配置参数信息是必须得~
下一期针对数据可靠性我们来做一个详细的介绍。 期待~
- END -作者介绍