俗世游子
2023/05/24阅读:7主题:极简黑
分布式流处理组件-优化篇:Producer生产调优之数据可靠性
💯 作者: 谢先生。 2014年入行的程序猿。多年开发和架构经验。专注于Java、云原生、大数据等技术。从CRUD入行,负责过亿级流量架构的设计和落地,解决了千万级数据治理问题。
📖 微信公众号、B站:搜索「谢先生说技术」不定时更新 ~
📂 清单: goku-framework、【定期开源】享阅读II
前言
经过上一章的介绍,我们从配置参数方面了解到一部分生产调优的方式。当然:
-
参数需要我们通过实际配置进行适当的调整,这是不可避免的~
而本章内容我们从数据可靠性下功夫,当然本文内容理论居多,稍微略显枯燥~
数据可靠性
关于数据可靠与否如何操作,我们在前面已经介绍的多次,主要就是通过配置acks
来保证。
这里我不会重复介绍acks的配置参数之类的,但是其中某一个配置项必须单拎出来,需要用它来引出一批非常重要的概念。这就是acks=all
。
acks=all: 当消息在Leader接收记录,并且等待副本数据同步完成之后,才会返回ack
那这时我们就需要考虑一个问题:如果Follower挂掉怎么办?这不影响执行效率么~ 别急,Kafka引入了如下方式来解决这个问题
ISR、OSR、AR

Kafka的分布式模型为主从架构,Leader负责数据的读写操作,Follower仅仅处理数据同步与备份。在这种模式下,如果Leader由于种种原因出现异常情况,其他Follower将会选举推出新的Leader进而保证整个集群的健康运行。
可是啊,如何保证重新推举出来的Leader的数据是最完善的呢?如果由于种种原因导致同步异常,而选举出来的Leader却是一台数据最烂的节点,对整个集群来讲,多闹心啊~
能有多闹心呢? 往⬇️看,就知道了。 同时,Kafka也为我们解决了这一问题,这就是接下来我们要强行插入的内容
HW、LEO
一个桶能装多少水,从来都是最低的那个木板决定的
在Kafka中,为了解决Leader和Follower之间消息同步的问题,引入了HW、LEO的机制。如图:
LEO表示当前日志文件中下一条待写入消息的位置,也可以说是当前日志文件最后一条消息再加一的位置。同时分区ISR集合中的每一个副本都会维护自身的LEO。
HW俗称高水位。表示一个特定消费的偏移量,消费者在消费的时候只能消费到HW之前的数据。而HW的最高位置是ISR集合节点中LEO最小的值

-- 这是一条分割线
继续介绍ISR
在Kafka中,所有分区副本组成的集合可以成为AR。 而在AR集合中,与Leader保持正常通信,并且同步数据正常的Follower副本,再加上Leader本身能够组成ISR集合。如果Leader不幸挂掉,那么只有ISR集合内的Follower副本能够竞争上岗~
在ISR集合中,如果Follower长时间内【默认30s】不能向Leader发送通信数据或同步数据,那么就会将该Follower从ISR集合中剔除。被ISR剔除的副本将会进入到OSR集合中
我们可以通过
replica.lag.time.max.ms
来设置副本通信时间
如何确保数据不丢失
前面介绍了acks=all的同步机制,如果我们想要保证数据可靠性,acks=all
这个配置时必不可少的
我们做过多次实验
同时数据有备份,ISR集合>1也是需要滴~~ 所以,记住这句话: - (acks=all) + (分区副 本>= 2) + (ISR >= 2)是保证数据可靠性的重要指标
数据重复
在生产端,acks=all
出现最大的问题:数据重复 其实数据重复属于一种正常现象,非正常波动现象造成的数据重试是造成数据重复的主要原因。而出现数据重试的大概率能有如下场景:
-
Leader分区宕机,Kafka进入Leader选举阶段~ -
Controller所在Broker宕机,集群进入Controller选举阶段~ -
网络分区等因素影响~
消费端处理消息,offset维护过程中宕机,重启消费端之后如果再次消费也将造成数据重复。 而关于消费端如何处理消息重复消费我们在后面介绍,本章只介绍Producer的解决方案。 敬请期待~
消息投递
消息在投递的时候可以按照这三种方式来投递:
-
最多一次: 消息最多只会被发送一次,不会重复发送,数据有可能会被丢失。
acks=0属于典型场景
-
至少一次: 消息最少会被发送一次,消息不会丢失,有可能重复发送。 -
精确一次: 消息只发一次,消息不会丢失并且消息不会重复发送。
幂等/事务 + 至少一次 = 精确一次
精确一次的投递方式可以靠如下方案来解决:
-
幂等 -
事务
幂等和事务都是在kafka 0.11之后的版本引入的,通过这两种方式可以解决Producer端消息重复的问题。当然这两种方案在处理过程中都是有一定的限制,接下来我们具体来看
幂等
同样运算逻辑下,每次执行的结果都是一样的。这就是幂等性。很多情况下我们开发的接口都需要保证幂等性。
而kafka是这么判断的:
-
当<PID,Partition,SeqNumber>三者组成的主键唯一时,提交到broker的消息才会被持久化
那么<PID,Partition,SeqNumber>到底是什么东西呢?
-
Producer每次在启动的时候,会申请到一个全局唯一的PID,且Producer或者Kafka每次重启之后都会进行重新分配 -
Partition就是对应的分区号 -
SeqNumber是分区内单调递增的序号
所以其实从这里我们就能明白,如果想要通过幂等来解决数据重复的话,有一个必要的条件
-
单分区单会话
想要在程序中开启幂等,需要这样做
config.setProperty(ProducerConfig.ACKS_CONFIG, "all");
config.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
config.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
这样我们就能在Producer开启幂等,同时我们需要注意:
-
acks必须配置为all -
开启幂等时配置的 max.in.flight.requests.per.connection <= 5
这个参数我们在Producer发送原理章节介绍过
数据有序
在特殊情况下,保证数据有序性也是一个非常关键的业务。在不考虑网络延迟、分区重分配的情况下,想要在Producer端保证消息数据有序性,可以通过如下方式:
- 分区 kafka中每个topic都可以分为不同的分区, 消息可以发送到不同的分区。而如果将消费通过不同分区发送到Broker中,必然是无法保证消息按照指定顺序到达~
同时: Producer消息发送必须在同一线程下,多线程也无法保证
for (int i = 0; i < 10; i++) {
producer.send(
new ProducerRecord<>("transaction-topic", 0, null, "data " + i)
);
}
- 序号 - 时间戳
序号或者时间戳都是一种方式的手段,在发送消息的时候对消息进行编号,然后在消费端进行排序处理
for (int i = 0; i < 10; i++) {
producer.send(
new ProducerRecord<>(
"transaction-topic",
0,
System.currentTimeMillis(),
null,
"data " + i
)
);
}
而关于对端,也就是消费端如何保证消费的有序性,我们在Consumer端细聊
事务
在kafka 0.11版本之后引入了事务机制,用来确定消息在生产者与消费者间的传递具有原子性。
要么全部成功,要么全部失败
事务实现案例
void initTransactions();
void beginTransaction() throws ProducerFencedException;
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) throws ProducerFencedException;
void commitTransaction() throws ProducerFencedException;
void abortTransaction() throws ProducerFencedException;
Kafka事务采用与两阶段提交类似的机制来处理,并且围绕以上方法。简单概括如下:
-
配置事务参数:在创建Kafka生产者对象时,需要设置 transactional.id
参数来标识一个事务。该参数的值应该是唯一的,因为它将用于在Kafka服务器上标识该事务。 很重要~~ -
开启事务:在生产者对象中调用 beginTransaction()
方法来开启一个事务。在事务开始后,生产者发送的所有消息都将被视为该事务的一部分。 -
发送消息:在事务中,使用 send()
方法来发送消息。这些消息不会立即被发送到Kafka服务器,而是被缓存在本地事务日志中。 -
预提交:当缓冲区中的消息数量达到一定阈值或者超时时,生产者会调用 sendOffsetsToTransaction()
方法向Kafka集群发送一个预提交请求。该请求包含待提交消息的事务ID以及它们的偏移量(offsets)。-
如果预提交请求成功,Kafka集群会为该事务分配一个全局唯一的事务序列号(transaction sequence number),用于跟踪该事务的状态。
-
-
提交事务:在生产者对象中调用 commitTransaction()
方法来提交事务,该请求会包含事务ID和之前分配的事务序列号。-
如果事务提交成功,Kafka集群会将该事务的所有消息作为一个整体提交,并且标记为已提交
-
-
中止事务:如果在事务提交之前发生异常:例如程序报错、网络异常、集群不可用等。 生产者可以调用 abortTransaction()
来中止事务。-
该操作会将所有未提交消息全部删除,用来释放由该事务占用的资源
-
来看看核心代码:
// required 事务ID 全局唯一
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction.id.1");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();
try {
for (int i = 0; i < 10; i++) {
producer.send(
new ProducerRecord<>(
"transaction-topic",
"data " + i
)
);
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}

自行修改代码产生异常,观察消费端现象
事务流程

Kafka事务处理规则
需要特别注意的是,在使用Kafka事务时,需要遵循一些规则,
-
kafka在0.11版本才引入了事务,所以这是最重要的第一条规则 -
其次,事务消息只能存在于单个主题分区,如下图: 程序并未特定指定某个分区 -
单个事务最多可以包含 transaction.max.timeout.ms
(默认为15分钟)内发送的消息。如果事务超时,则它将自动中止 -
生产者客户端必须使用相同的 transactional.id
来开始和提交事务。该ID用于唯一标识事务,并确保在同一事务范围内发送的所有消息都与该事务关联。
下期预告
本期关于Producer生产者相关内容已经介绍完结,接下来我们会进入到Broker的篇章。
期待~
- END -作者介绍