004、kafka常用的脚本使用案例
本文最后更新于 320 天前,其中的信息可能已经过时,如有错误请发送邮件到wuxianglongblog@163.com

kafka常用的脚本使用案例

一.kafka集群topic管理

1.查看现有的topic名称

[root@elk101.oldboyedu.com ~]# kafka-topics.sh --zookeeper 172.200.1.102:2181/kafka --list
[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list

[root@elk101.oldboyedu.com ~]# 

温馨提示:
    (1)我们可以基于"--zookeeper"指令去zookeeper查询现有的topic信息;
    (2)我们也可以基于"--bootstrap-server"指令去kafka broker查询现有的topic信息,官方推荐使用"--bootstrap-server"的方式去管理topic。

2.创建topic

[root@elk101.oldboyedu.com ~]# kafka-topics.sh --zookeeper 172.200.1.102:2181/kafka --list
[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list

[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --create --zookeeper 172.200.1.102:2181/kafka --create --topic myNginx --partitions 3 --replication-factor 2
Created topic myNginx.
[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --zookeeper 172.200.1.102:2181/kafka --list
myNginx
[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list
myNginx
[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --create --topic demo2021 --partitions 2  --replication-factor 3
Created topic demo2021.
[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --create --topic demo2022 --partitions 2  --replication-factor 5  # 注意哈,创建的副本数必须小于等于集群的数量!
Error while executing topic command : Replication factor: 5 larger than available brokers: 3.
[2021-05-07 15:03:59,850] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 5 larger than available brokers: 3.
 (kafka.admin.TopicCommand$)
[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list
demo2021
myNginx
[root@elk101.oldboyedu.com ~]# 

温馨提示:
    --topic:
        指定topic的名称。
    --partitions:
        指定分区数。
    --replication-factor:
        指定副本数。

3.查看某个topic的详细信息

[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --describe --topic myNginx
Topic: myNginx  TopicId: yq-BQW6ERHueoOdeYgfoKQ PartitionCount: 3   ReplicationFactor: 2    Configs: segment.bytes=1073741824
    Topic: myNginx  Partition: 0    Leader: 101 Replicas: 101,103   Isr: 101,103
    Topic: myNginx  Partition: 1    Leader: 102 Replicas: 102,101   Isr: 102,101
    Topic: myNginx  Partition: 2    Leader: 103 Replicas: 103,102   Isr: 103,102
[root@elk101.oldboyedu.com ~]# 

温馨提示:
    如果不使用"--topic"指定topic的名称,则默认查看所有的topic信息哟。

4.修改topic的信息

    (1)修改topic的分区数量(分区数只能扩容,无法缩容!)
        [root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --describe --topic myNginx
        Topic: myNginx  TopicId: yq-BQW6ERHueoOdeYgfoKQ PartitionCount: 3   ReplicationFactor: 2    Configs: segment.bytes=1073741824
            Topic: myNginx  Partition: 0    Leader: 101 Replicas: 101,103   Isr: 101,103
            Topic: myNginx  Partition: 1    Leader: 102 Replicas: 102,101   Isr: 102,101
            Topic: myNginx  Partition: 2    Leader: 103 Replicas: 103,102   Isr: 103,102
        [root@elk101.oldboyedu.com ~]# 
        [root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --alter --topic myNginx --partitions 8  # 将分区数设置为8,则由默认的3个分区扩容为8个分区。
        [root@elk101.oldboyedu.com ~]# 
        [root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --describe --topic myNginx
        Topic: myNginx  TopicId: yq-BQW6ERHueoOdeYgfoKQ PartitionCount: 8   ReplicationFactor: 2    Configs: segment.bytes=1073741824
            Topic: myNginx  Partition: 0    Leader: 101 Replicas: 101,103   Isr: 101,103
            Topic: myNginx  Partition: 1    Leader: 102 Replicas: 102,101   Isr: 102,101
            Topic: myNginx  Partition: 2    Leader: 103 Replicas: 103,102   Isr: 103,102
            Topic: myNginx  Partition: 3    Leader: 101 Replicas: 101,103   Isr: 101,103
            Topic: myNginx  Partition: 4    Leader: 102 Replicas: 102,101   Isr: 102,101
            Topic: myNginx  Partition: 5    Leader: 103 Replicas: 103,102   Isr: 103,102
            Topic: myNginx  Partition: 6    Leader: 101 Replicas: 101,102   Isr: 101,102
            Topic: myNginx  Partition: 7    Leader: 102 Replicas: 102,103   Isr: 102,103
        [root@elk101.oldboyedu.com ~]# 
        [root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --alter --topic myNginx --partitions 5  # 将分区数设置为5会失败,因为分区数只能扩容,无法缩容!
        Error while executing topic command : Topic currently has 8 partitions, which is higher than the requested 5.
        [2021-05-07 14:53:45,130] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 8 partitions, which is higher than the requested 5.
         (kafka.admin.TopicCommand$)
        [root@elk101.oldboyedu.com ~]# 

    (2)修改topic的副本数量(副本数只能小于等于集群的数量)
        副本数修改是比较麻烦的,需要我们手动修改json文件,详情可参考我之前的笔记: https://www.cnblogs.com/oldboy/p/9808125.html

5. 删除topic

[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list
myNginx
[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --delete --topic myNginx
[root@elk101.oldboyedu.com ~]# 
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list

[root@elk101.oldboyedu.com ~]# 

温馨提示:
    如下图所示,当我们删除topic后,数据并不会立即被删除,而是会间隔一段时间后才会被真正的删除,这个间隔时间大约在3-5分钟左右(我没有做秒表计算,只是做了一个大概估计,在演示的时候可以做一个记时!)。

二.kafka集群生产者管理

1.创建一个生产者往topic写入数据

[root@elk101.oldboyedu.com ~]# kafka-console-producer.sh --topic myNginx --broker-list 172.200.1.101:9092
>hello world!
>

温馨提示:
    生产者创建成功后,我们可以就手动自定义写入测试数据,而后开启一个消费者进行数据消费,如果能正常获取数据,则说明集群启动是正常运行的。

三.kafka集群消费者管理

1.从头开始消费数据

[root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic myNginx --bootstrap-server 172.200.1.101:9092 --from-beginning
hello world!

温馨提示:
    我们可以不使用"--from-beginning"参数,则有可能生产者要比消费者先启动一段时间,而且也将数据写入到kafka集群中了,但如果我们不想要在消费者启动时之前的数据可以不加该参数,如果使用该参数则表示从头进行消费。

2.查看消费者组信息

[root@elk101.oldboyedu.com ~]# kafka-consumer-groups.sh --bootstrap-server 172.200.1.101:9092 --list 
console-consumer-43247
console-consumer-52021
test-consumer-group
[root@elk101.oldboyedu.com ~]# 

3.消费者组案例

    (1)创建"oldboy"的topic:
        [root@elk101.oldboyedu.com ~]# kafka-topics.sh --create --zookeeper 172.200.1.102:2181/kafka --create --topic oldboy --partitions 3 --replication-factor 2
        Created topic oldboy.
        [root@elk101.oldboyedu.com ~]# 

    (2)修改消费者的配置文件
        [root@elk101.oldboyedu.com ~]# vim /oldboy/softwares/kafka/config/consumer.properties 
        [root@elk101.oldboyedu.com ~]# 
        [root@elk101.oldboyedu.com ~]# grep oldboy /oldboy/softwares/kafka/config/consumer.properties 
        group.id=oldboyedu
        [root@elk101.oldboyedu.com ~]# 

    (3)单独开启一个终端查看"__consumer_offsets"的topic内容:
        [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$Offs
        etsMessageFormatter" --consumer.config /oldboy/softwares/kafka/config/consumer.properties --from-beginning

    (4)创建生产者
        [root@elk101.oldboyedu.com ~]# kafka-console-producer.sh --topic oldboy --broker-list 172.200.1.101:9092
        >hello oldboy
        >123
        >456
        >789
        >

    (5)基于消费者的配置文件启动一个消费者
        [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --bootstrap-server 172.200.1.101:9092 --topic oldboy --consumer.config /oldboy/softwares/kafka/config/consumer.properties
        hello oldboy
        123
        789

    (6)基于命令行参数启动一个消费者
        [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --bootstrap-server 172.200.1.101:9092 --topic oldboy --consumer-property group.id=oldboyedu
        456

    (7)查看当前的消费者组信息
        [root@elk101.oldboyedu.com ~]# kafka-consumer-groups.sh --bootstrap-server 172.200.1.101:9092 --list 
        oldboyedu
        test-consumer-group
        [root@elk101.oldboyedu.com ~]# 

    温馨提示:
        (1)上面的案例启动了1个生产者,2个消费者。但同一时刻只有一个消费者接收到生产者的消息,不可能2个消费者同时接收到生产者发送的消息哟~
        (2)由于我们上面创建的topic是3个,接下来我们在启动3个消费者,加上上面的2个消费者,总共就是5个消费者,而在同一个消费者组中有5个消费者,而分区数仅有3个,最终会有2个消费者拿不到数据哟;
        (3)当一个消费者新加入了新的消费者时,分区会重新均衡哟,也就说之前能接收数据的消费者,在重新均衡的时候可能会分配不到分区哟;
        (4)关于上面的"--bootstrap-server"参数后面,

四.查看"__consumer_offsets"的topic内容实操案例

1.修改消费者的配置文件("consumer.properties"),可选操作

[root@elk101.oldboyedu.com ~]# egrep -v "^#|^$" /oldboy/softwares/kafka/config/consumer.properties  # 个人觉得,如果有必要修改的化,可以只修改group.id即可,其它2个参数我发现不修改也可以!
bootstrap.servers=172.200.1.101:9092  
group.id=test-consumer-group
exclude.internal.topics=false  # 该参数网上很多人都信誓旦旦的说必须将其改为false,但通过我的实际测试,该参数我们设置为true和false都不会影响下面的实验。
[root@elk101.oldboyedu.com ~]# 

温馨提示:
    我们可以用正则来订阅topic,而__consumer_offsets是kafka server中一个默认的topic名,当用户有一个topic叫sumer_off,且用正则的方式来订阅topic,如下所示:
        Pattern pattern = Pattern.compile(".*umer_of.*");
        consumer.subscribe(pattern);
    那么正则会匹配到__consumer_offsets这个内部topic,所以就用这个参数来限制是否暴露内部topic,当设置为true是,就算正则匹配中了内部topic,也不会消费,此时只能通过订阅的方式来消费内部topic,当这个参数设置为false时,如果正则匹配到内部topic,就会消费到内部topic的数据。

参考链接:
    https://blog.csdn.net/vim_wj/article/details/78840989

2.查看消费者组信息并计算其在"__consumer_offsets"的topic上offset提交的分区数编号。

[root@elk101.oldboyedu.com ~]# kafka-consumer-groups.sh --bootstrap-server 172.200.1.101:9092 --list 
console-consumer-85629
[root@elk101.oldboyedu.com ~]# 

温馨提示:
    (1)Kafka会使用下面公式计算消费者组位移保存在名为"__consumer_offsets"的topic的哪个分区上:
        Math.abs(groupID.hashCode()) % numPartitions

    (2)编写JAVA代码计算"console-consumer-85629"对应的分区数,默认情况下,"__consumer_offsets"有50个分区:
        [root@elk101.oldboyedu.com ~/kafka]# vim get_partition.java 
        [root@elk101.oldboyedu.com ~/kafka]#
        [root@elk101.oldboyedu.com ~/kafka]# cat get_partition.java  # 注意当前的文件名称要和java代码中main方法的类名一致哟~
        /*
        *  author:
        *    oldboy
        *  blog:
        *    http://www.cnblogs.com/oldboy
        *  EMAIL:
        *    y1053419035@qq.com
        *
        */

        public class get_partition {
            public static void main(String[] args) throws Exception {
                // 注意哈,"__consumer_offsets"的topic默认分区是50个哟~
                System.out.println(Math.abs(args[0].hashCode()) % 50);
            }
        }

        [root@elk101.oldboyedu.com ~/kafka]#
        [root@elk101.oldboyedu.com ~/kafka]# ll
        总用量 4
        -rw-r--r-- 1 root root 359 5月   7 17:02 get_partition.java
        [root@elk101.oldboyedu.com ~/kafka]# 
        [root@elk101.oldboyedu.com ~/kafka]# javac get_partition.java 
        [root@elk101.oldboyedu.com ~/kafka]# 
        [root@elk101.oldboyedu.com ~/kafka]# ll
        总用量 8
        -rw-r--r-- 1 root root 547 5月   7 17:04 get_partition.class
        -rw-r--r-- 1 root root 359 5月   7 17:02 get_partition.java
        [root@elk101.oldboyedu.com ~/kafka]# 
        [root@elk101.oldboyedu.com ~/kafka]# java  get_partition console-consumer-51563  # 注意哈,该组名就是上面我们查询出来的哟~
        31
        [root@elk101.oldboyedu.com ~/kafka]# 
        [root@elk101.oldboyedu.com ~/kafka]# java  get_partition "console-consumer-51563"
        31
        [root@elk101.oldboyedu.com ~/kafka]# 

3.查看消费者组的偏移量信息

    (1)基于配置文件查看消费者组的偏移量信息
        [root@elk101.oldboyedu.com ~]# egrep -v "^#|^$" /oldboy/softwares/kafka/config/consumer.properties  # 修改消费者的配置文件
        bootstrap.servers=172.200.1.101:9092
        group.id=test-consumer-group
        exclude.internal.topics=false
        [root@elk101.oldboyedu.com ~]# 
        [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$Off
setsMessageFormatter" --consumer.config /oldboy/softwares/kafka/config/consumer.properties --from-beginning  # 注意哈,下面的数据基本上每3秒刷新一次。

        ...

        [test-consumer-group,__consumer_offsets,31]::OffsetAndMetadata(offset=708, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620378740504, expireTimestamp=None)
        [test-consumer-group,__consumer_offsets,28]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1620378878071, expireTimestamp=None)
        [test-consumer-group,__consumer_offsets,31]::OffsetAndMetadata(offset=760, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620378878071, expireTimestamp=None)
        [test-consumer-group,__consumer_offsets,28]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1620378883072, expireTimestamp=None)
        [test-consumer-group,__consumer_offsets,31]::OffsetAndMetadata(offset=810, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620378883072, expireTimestamp=None)

        ...

    (2)基于"__consumer_offsets"的topic上的分区编号查看消费者组的偏移量信息
        [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --partition 11 --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetad
        ataManager\$OffsetsMessageFormatter"  --from-beginning  # 注意哈,当我从分区为"11"的编号去获取数据发现获取的数据量为0条。
        ^CProcessed a total of 0 messages
        [root@elk101.oldboyedu.com ~]# 
        [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --partition 31 --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetad
        ataManager\$OffsetsMessageFormatter"  --from-beginning  # 注意哈,当我从分区为"31"的编号去获取数据发现获取的数据为707条,说明数据的确是提交到31号分区啦!

        ...

        [test-consumer-group,__consumer_offsets,8]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1620375184336, expireTimestamp=None)
        [test-consumer-group,__consumer_offsets,21]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620375184336, expireTimestamp=None)
        [test-consumer-group,__consumer_offsets,4]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620375184336, expireTimestamp=None)

        ...

        ^CProcessed a total of 707 messages
        [root@elk101.oldboyedu.com ~]#  

    (3)"_consumer_offsets topic"的日志格式说明:
        日志格式如下所示:(可以理解为"Key::Value"格式)
            [Group, Topic, Partition]::OffsetAndMetadata(Offset, leaderEpoch, Metadata, commitTimestamp, expireTimestamp]
        相关字段说明如下:
            Group:
                对应消费者组的groupid,这条消息要发送到"__consumer_offset"的哪个分区,是由这个字段决定的。
                值得注意是,此处设置的是消费者组名称,而非消费者组内的某个消费者。这样设计的好处是,当一个消费者组内的消费者数量有所变动时会导致的重新rebalance。
                而消费者组内的消费者重新分配到新的partition时,它们是知道该partition被消费到哪里的,因为在broker都有对应消费者组关于某个分区以消费的offset。
            Topic:
                主题名称。
            Partition:
                主题的分区编号。
            OffsetAndMetadata:
                偏移量和元数据信息。其包含以下五项内容:
                    offset:
                        偏移量信息。
                    leaderEpoch:
                        Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。它们可能引起的问题包括:
                            备份数据丢失
                            备份数据不一致 
                        Kafka 0.11版本之后引入了leader epoch来取代HW值。Leader端多开辟一段内存区域专门保存leader的epoch信息,这样即使出现上面的两个场景也能很好地规避这些问题。
                    Metadata:
                        自定义元数据信息,通常情况下为空,因为这种场景很少会用到。
                    commitTimestamp:
                        提交到kafka的时间。
                    expireTimestamp:
                        过期时间, 当数据过期时会有一个定时任务去清理过期的消息。
谨此笔记,记录过往。凭君阅览,如能收益,莫大奢望。
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇