俗世游子
2023/02/03阅读:17主题:极简黑
分布式流处理组件-理论实战结合篇:Kafka实战演练,基操勿6~
前言
本文属于《分布式流处理组件-理论实战结合篇:Kafka架构理论》的姊妹篇,上一篇文章仔细阅读发现了一点小问题:
-
文章内容过于理论,与文章标题严重不符 为了保证不被嫌弃,马上开启新篇章:专业实战篇。
根据规划,本篇将分三个系列对Kafka基础操作进行演练
-
Topic 针对Topic的CRUD操作,并且详细区分分区与副本的概念
-
Producer 消息生产者
-
Consumer 消息消费者
除此之外,还会为大家展示在Kafka在ZK中存储的数据信息,那么我们来开始吧~
实战
Topic实战
在Kafka中,Topic是一个虚拟的概念,本身不承担任何和数据相关的作用。但是作为生产者和消费者间桥梁,Topic起到了及其重要的作用。任意生产或者消费都是针对Topic来做的操作。
在Kafka中通过kafka-topic.sh进行操作,当直接执行kafka-topic.sh
脚本,会出现如下提示
Create, delete, describe, or change a topic.
并且还附带出相关的全部附带参数信息
以下内容很重要:
-
kafka-topic.sh
执行时,必须携带一个功能:如CRUD。 -
其次,需要注意参数信息中出现 REQUIRED
字样的 举点例子
# 创建topic
kafka-topics.sh --create --bootstrap-server node01:9092,node02:9092,node03:9092 --topic newTopic001 --partitions 3 --replication-factor 2
# 查看topic列表
kafka-topics.sh --bootstrap-server master:9092,node01:9092,node02:9092 --list
# 查看指定topic的详情
kafka-topics.sh --bootstrap-server master:9092,node01:9092,node02:9092 --describe --topic newTopic001
以上的操作需要注意几个参数
参数 | 是否必填 | 描述 |
---|---|---|
bootstrap-server | 是 | kafak中broker节点的链接信息,通过host:port的形式跟在后面;如果是集群描述,则已host1:port1,host2:port2的形式 |
topic | - | 主题名称,在create、describe、delete下必填 |
create | - | 创建一个topic |
delete | - | 删除topic |
describe | - | 查看指定topic的详细信息 |
list | - | 查看当前kafka下的topic列表信息 |
partitions | - | 指定新创建topic的分区数,这里指定不能超过broker的节点数 |
当前配置不能超过broker节点数以上就是针对kafka-topic.sh 操作的相关核心参数。大家可以自己去做一个实践操作~ |
分区副本默认情况下有一套自己的放置策略,当然也是让我们自己自定义的。 别急,精彩内容都在~
彩蛋:自定义分区副本节点
程序员要有程序员的基本素养,虽然我们不干运维的工作,但是我们应该清楚一点:Broker节点所在服务器资源,尤其是磁盘空间,不一定是完全一样的。
此时,我们就不能采用默认创建Topic的方式,而是需要通过人工干预,将Topic的分区副本放置在不同的服务器节点上,以便更大限度的利用服务器资源。
接下来将为大家演示整个手动干预的过程,首先我们先来创建一个演示Topic,然后查看整体信息
kafka-topics.sh --bootstrap-server master:9092,node01:9092 --create --topic test-reassignment --partitions 3 --replication-factor 2
关于创建Topic时默认的分区副本放置策略,我们暂时先不聊。它本身的放置策略这个我们在Broker中细说,但是从整体上来看,我们能够发现默认的副本放置相对比时较均匀的。
均匀放置可不行,接下来我们开始演示如何手动干预。这里需要用到另一个脚本kafka-reassign-partitions.sh
,其他的暂且不看,主要观察如下参数
-
--reassignment-json-file 这个参数需要我们创建一个JSON文件,然后通过他提供的相关实例,将我们的副本存储计划进行编写
{
"version": 1,
"partitions": [
{
"topic": "test-reassignment",
"partition": 0,
"replicas": [0,1,2]
},
{
"topic": "test-reassignment",
"partition": 1,
"replicas": [1,2]
},
{
"topic": "test-reassignment",
"partition": 2,
"replicas": [1]
}
]
}
随后我们通过--execute
指令对存储计划进行执行,当输出如下结果表示计划执行成功
kafka-reassign-partitions.sh --bootstrap-server master:9092,node01:9092 --reassignment-json-file reassignment-json-file.json --execute
随后开始关键性的验证流程,还是相同的参数,不过将--execute
换为--verify
验证完成之后,整个存储计划执行结束
kafka-reassign-partitions.sh --bootstrap-server master:9092,node01:9092 --reassignment-json-file reassignment-json-file.json --verify
再次查看Topic的详细信息,发现在Replicas处的节点信息已经发生了改变,随之Isr相对应的变化,说明了本次人工干预副本放置已成功。
关于其他更多内容,我们后面一步步揭秘~
Producer实战
下面我们进入到Producer的实战环节
# 生产者最小执行单位
kafka-console-producer.sh --topic newTopic001 --bootstrap-server master:9092,node01:9092,node02:9092
本质上了解到上面的程序就已经可以了,但是作为一名资深猿,不允许大家了解的这么片面。
那么接下来我们一起看看其他的参数信息
参数 | 是否必填 | 描述 |
---|---|---|
bootstrap-server | 是 | kafak中broker节点的链接信息,通过host:port的形式跟在后面;如果是集群描述,则已host1:port1,host2:port2的形式 |
topic | 是 | 主题名称, |
牢记,该参数能够提高Producer的吞吐量更多参数暂时也用不到,随后大家自行尝试 |
Consumer实战
Producer端负责生产消息,那么Consumer就负责对消息进行消费。到此能够对整个生产-消费的流程形成闭环。
接下来我们来看看如何进行消费吧
# 每次启动都从第一条消息进行消费
kafka-console-consumer.sh --topic newTopic001 --bootstrap-server master:9092,node01:9092,node02:9092 --from-beginning
# 设置分组,只会消费最新的消息
kafka-console-consumer.sh --topic newTopic001 --bootstrap-server master:9092,node01:9092,node02:9092 --group topic_group_001
QA:--group的配置
后面当我们演示了通过Java程序来执行消费者的时候会发现:
-
消费者必须要配置group 而在 kafka-console-consumer.sh
并没有强制要求,主要是因为在脚本模式下会对group进行默认配置,所以脚本模式并不需要这样的强制要求
QA:--from-beginning的配置
当前消费者会重复消费本Topic内的消息,在实际开发过程中也需要根据实际业务酌情考量
那行,其他的都不说了,接下来放松一下,看个动图:感受一下生产者和消费者间的联动
下期预告
到了这里,本次有关Kafka的基本实战操作就此结束。
关注我,后面带你详细了解Producer生产者
- END -作者介绍