俗世游子

V1

2023/02/03阅读:17主题:极简黑

分布式流处理组件-理论实战结合篇:Kafka实战演练,基操勿6~

图片
图片

前言

本文属于《分布式流处理组件-理论实战结合篇:Kafka架构理论》的姊妹篇,上一篇文章仔细阅读发现了一点小问题:

  • 文章内容过于理论,与文章标题严重不符 为了保证不被嫌弃,马上开启新篇章:专业实战篇。
图片
图片

根据规划,本篇将分三个系列对Kafka基础操作进行演练

  • Topic 针对Topic的CRUD操作,并且详细区分分区与副本的概念

  • Producer 消息生产者

  • Consumer 消息消费者

除此之外,还会为大家展示在Kafka在ZK中存储的数据信息,那么我们来开始吧~

实战

Topic实战

在Kafka中,Topic是一个虚拟的概念,本身不承担任何和数据相关的作用。但是作为生产者和消费者间桥梁,Topic起到了及其重要的作用。任意生产或者消费都是针对Topic来做的操作。

在Kafka中通过kafka-topic.sh进行操作,当直接执行kafka-topic.sh脚本,会出现如下提示

Create, delete, describe, or change a topic.并且还附带出相关的全部附带参数信息

以下内容很重要:

  • kafka-topic.sh执行时,必须携带一个功能:如CRUD。
  • 其次,需要注意参数信息中出现REQUIRED字样的 举点例子
# 创建topic
kafka-topics.sh --create --bootstrap-server node01:9092,node02:9092,node03:9092 --topic newTopic001 --partitions 3 --replication-factor 2

# 查看topic列表
kafka-topics.sh --bootstrap-server master:9092,node01:9092,node02:9092 --list

# 查看指定topic的详情
kafka-topics.sh --bootstrap-server master:9092,node01:9092,node02:9092 --describe --topic newTopic001

以上的操作需要注意几个参数

参数 是否必填 描述
bootstrap-server kafak中broker节点的链接信息,通过host:port的形式跟在后面;如果是集群描述,则已host1:port1,host2:port2的形式
topic - 主题名称,在create、describe、delete下必填
create - 创建一个topic
delete - 删除topic
describe - 查看指定topic的详细信息
list - 查看当前kafka下的topic列表信息
partitions - 指定新创建topic的分区数,这里指定不能超过broker的节点数
当前配置不能超过broker节点数以上就是针对kafka-topic.sh操作的相关核心参数。大家可以自己去做一个实践操作~

分区副本默认情况下有一套自己的放置策略,当然也是让我们自己自定义的。 别急,精彩内容都在~

彩蛋:自定义分区副本节点

程序员要有程序员的基本素养,虽然我们不干运维的工作,但是我们应该清楚一点:Broker节点所在服务器资源,尤其是磁盘空间,不一定是完全一样的。

此时,我们就不能采用默认创建Topic的方式,而是需要通过人工干预,将Topic的分区副本放置在不同的服务器节点上,以便更大限度的利用服务器资源。

接下来将为大家演示整个手动干预的过程,首先我们先来创建一个演示Topic,然后查看整体信息

kafka-topics.sh --bootstrap-server master:9092,node01:9092 --create --topic test-reassignment --partitions 3 --replication-factor 2
图片
图片

关于创建Topic时默认的分区副本放置策略,我们暂时先不聊。它本身的放置策略这个我们在Broker中细说,但是从整体上来看,我们能够发现默认的副本放置相对比时较均匀的。

均匀放置可不行,接下来我们开始演示如何手动干预。这里需要用到另一个脚本kafka-reassign-partitions.sh,其他的暂且不看,主要观察如下参数

  • --reassignment-json-file 这个参数需要我们创建一个JSON文件,然后通过他提供的相关实例,将我们的副本存储计划进行编写
{
    "version"1,
    "partitions": [
        {
            "topic""test-reassignment",
            "partition"0,
            "replicas": [0,1,2]
        },
        {
            "topic""test-reassignment",
            "partition"1,
            "replicas": [1,2]
        },
        {
            "topic""test-reassignment",
            "partition"2,
            "replicas": [1]
        }
    ]
}

随后我们通过--execute指令对存储计划进行执行,当输出如下结果表示计划执行成功

kafka-reassign-partitions.sh --bootstrap-server master:9092,node01:9092 --reassignment-json-file reassignment-json-file.json --execute
图片
图片

随后开始关键性的验证流程,还是相同的参数,不过将--execute换为--verify验证完成之后,整个存储计划执行结束

kafka-reassign-partitions.sh --bootstrap-server master:9092,node01:9092 --reassignment-json-file reassignment-json-file.json --verify
图片
图片

再次查看Topic的详细信息,发现在Replicas处的节点信息已经发生了改变,随之Isr相对应的变化,说明了本次人工干预副本放置已成功。

图片
图片

关于其他更多内容,我们后面一步步揭秘~

Producer实战

下面我们进入到Producer的实战环节

# 生产者最小执行单位
kafka-console-producer.sh --topic newTopic001 --bootstrap-server master:9092,node01:9092,node02:9092
图片
图片

本质上了解到上面的程序就已经可以了,但是作为一名资深猿,不允许大家了解的这么片面。

那么接下来我们一起看看其他的参数信息

参数 是否必填 描述
bootstrap-server kafak中broker节点的链接信息,通过host:port的形式跟在后面;如果是集群描述,则已host1:port1,host2:port2的形式
topic 主题名称,
牢记,该参数能够提高Producer的吞吐量更多参数暂时也用不到,随后大家自行尝试

Consumer实战

Producer端负责生产消息,那么Consumer就负责对消息进行消费。到此能够对整个生产-消费的流程形成闭环。

接下来我们来看看如何进行消费吧

# 每次启动都从第一条消息进行消费
kafka-console-consumer.sh --topic newTopic001 --bootstrap-server master:9092,node01:9092,node02:9092 --from-beginning

# 设置分组,只会消费最新的消息
kafka-console-consumer.sh --topic newTopic001 --bootstrap-server master:9092,node01:9092,node02:9092 --group topic_group_001
图片
图片

QA:--group的配置

后面当我们演示了通过Java程序来执行消费者的时候会发现:

  • 消费者必须要配置group 而在kafka-console-consumer.sh并没有强制要求,主要是因为在脚本模式下会对group进行默认配置,所以脚本模式并不需要这样的强制要求

QA:--from-beginning的配置

当前消费者会重复消费本Topic内的消息,在实际开发过程中也需要根据实际业务酌情考量

那行,其他的都不说了,接下来放松一下,看个动图:感受一下生产者和消费者间的联动

图片
图片

下期预告

到了这里,本次有关Kafka的基本实战操作就此结束。

关注我,后面带你详细了解Producer生产者

- END -

分类:

后端

标签:

后端

作者介绍

俗世游子
V1