本文最后更新于 320 天前,其中的信息可能已经过时,如有错误请发送邮件到wuxianglongblog@163.com
kafka常用的脚本使用案例
一.kafka集群topic管理
1.查看现有的topic名称
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --zookeeper 172.200.1.102:2181/kafka --list
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list
[root@elk101.oldboyedu.com ~]#
温馨提示:
(1)我们可以基于"--zookeeper"指令去zookeeper查询现有的topic信息;
(2)我们也可以基于"--bootstrap-server"指令去kafka broker查询现有的topic信息,官方推荐使用"--bootstrap-server"的方式去管理topic。
2.创建topic
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --zookeeper 172.200.1.102:2181/kafka --list
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --create --zookeeper 172.200.1.102:2181/kafka --create --topic myNginx --partitions 3 --replication-factor 2
Created topic myNginx.
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --zookeeper 172.200.1.102:2181/kafka --list
myNginx
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list
myNginx
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --create --topic demo2021 --partitions 2 --replication-factor 3
Created topic demo2021.
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --create --topic demo2022 --partitions 2 --replication-factor 5 # 注意哈,创建的副本数必须小于等于集群的数量!
Error while executing topic command : Replication factor: 5 larger than available brokers: 3.
[2021-05-07 15:03:59,850] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 5 larger than available brokers: 3.
(kafka.admin.TopicCommand$)
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list
demo2021
myNginx
[root@elk101.oldboyedu.com ~]#
温馨提示:
--topic:
指定topic的名称。
--partitions:
指定分区数。
--replication-factor:
指定副本数。
3.查看某个topic的详细信息
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --describe --topic myNginx
Topic: myNginx TopicId: yq-BQW6ERHueoOdeYgfoKQ PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: myNginx Partition: 0 Leader: 101 Replicas: 101,103 Isr: 101,103
Topic: myNginx Partition: 1 Leader: 102 Replicas: 102,101 Isr: 102,101
Topic: myNginx Partition: 2 Leader: 103 Replicas: 103,102 Isr: 103,102
[root@elk101.oldboyedu.com ~]#
温馨提示:
如果不使用"--topic"指定topic的名称,则默认查看所有的topic信息哟。
4.修改topic的信息
(1)修改topic的分区数量(分区数只能扩容,无法缩容!)
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --describe --topic myNginx
Topic: myNginx TopicId: yq-BQW6ERHueoOdeYgfoKQ PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: myNginx Partition: 0 Leader: 101 Replicas: 101,103 Isr: 101,103
Topic: myNginx Partition: 1 Leader: 102 Replicas: 102,101 Isr: 102,101
Topic: myNginx Partition: 2 Leader: 103 Replicas: 103,102 Isr: 103,102
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --alter --topic myNginx --partitions 8 # 将分区数设置为8,则由默认的3个分区扩容为8个分区。
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --describe --topic myNginx
Topic: myNginx TopicId: yq-BQW6ERHueoOdeYgfoKQ PartitionCount: 8 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: myNginx Partition: 0 Leader: 101 Replicas: 101,103 Isr: 101,103
Topic: myNginx Partition: 1 Leader: 102 Replicas: 102,101 Isr: 102,101
Topic: myNginx Partition: 2 Leader: 103 Replicas: 103,102 Isr: 103,102
Topic: myNginx Partition: 3 Leader: 101 Replicas: 101,103 Isr: 101,103
Topic: myNginx Partition: 4 Leader: 102 Replicas: 102,101 Isr: 102,101
Topic: myNginx Partition: 5 Leader: 103 Replicas: 103,102 Isr: 103,102
Topic: myNginx Partition: 6 Leader: 101 Replicas: 101,102 Isr: 101,102
Topic: myNginx Partition: 7 Leader: 102 Replicas: 102,103 Isr: 102,103
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --alter --topic myNginx --partitions 5 # 将分区数设置为5会失败,因为分区数只能扩容,无法缩容!
Error while executing topic command : Topic currently has 8 partitions, which is higher than the requested 5.
[2021-05-07 14:53:45,130] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 8 partitions, which is higher than the requested 5.
(kafka.admin.TopicCommand$)
[root@elk101.oldboyedu.com ~]#
(2)修改topic的副本数量(副本数只能小于等于集群的数量)
副本数修改是比较麻烦的,需要我们手动修改json文件,详情可参考我之前的笔记: https://www.cnblogs.com/oldboy/p/9808125.html
5. 删除topic
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list
myNginx
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --delete --topic myNginx
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --bootstrap-server 172.200.1.101:9092 --list
[root@elk101.oldboyedu.com ~]#
温馨提示:
如下图所示,当我们删除topic后,数据并不会立即被删除,而是会间隔一段时间后才会被真正的删除,这个间隔时间大约在3-5分钟左右(我没有做秒表计算,只是做了一个大概估计,在演示的时候可以做一个记时!)。
二.kafka集群生产者管理
1.创建一个生产者往topic写入数据
[root@elk101.oldboyedu.com ~]# kafka-console-producer.sh --topic myNginx --broker-list 172.200.1.101:9092
>hello world!
>
温馨提示:
生产者创建成功后,我们可以就手动自定义写入测试数据,而后开启一个消费者进行数据消费,如果能正常获取数据,则说明集群启动是正常运行的。
三.kafka集群消费者管理
1.从头开始消费数据
[root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic myNginx --bootstrap-server 172.200.1.101:9092 --from-beginning
hello world!
温馨提示:
我们可以不使用"--from-beginning"参数,则有可能生产者要比消费者先启动一段时间,而且也将数据写入到kafka集群中了,但如果我们不想要在消费者启动时之前的数据可以不加该参数,如果使用该参数则表示从头进行消费。
2.查看消费者组信息
[root@elk101.oldboyedu.com ~]# kafka-consumer-groups.sh --bootstrap-server 172.200.1.101:9092 --list
console-consumer-43247
console-consumer-52021
test-consumer-group
[root@elk101.oldboyedu.com ~]#
3.消费者组案例
(1)创建"oldboy"的topic:
[root@elk101.oldboyedu.com ~]# kafka-topics.sh --create --zookeeper 172.200.1.102:2181/kafka --create --topic oldboy --partitions 3 --replication-factor 2
Created topic oldboy.
[root@elk101.oldboyedu.com ~]#
(2)修改消费者的配置文件
[root@elk101.oldboyedu.com ~]# vim /oldboy/softwares/kafka/config/consumer.properties
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# grep oldboy /oldboy/softwares/kafka/config/consumer.properties
group.id=oldboyedu
[root@elk101.oldboyedu.com ~]#
(3)单独开启一个终端查看"__consumer_offsets"的topic内容:
[root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$Offs
etsMessageFormatter" --consumer.config /oldboy/softwares/kafka/config/consumer.properties --from-beginning
(4)创建生产者
[root@elk101.oldboyedu.com ~]# kafka-console-producer.sh --topic oldboy --broker-list 172.200.1.101:9092
>hello oldboy
>123
>456
>789
>
(5)基于消费者的配置文件启动一个消费者
[root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --bootstrap-server 172.200.1.101:9092 --topic oldboy --consumer.config /oldboy/softwares/kafka/config/consumer.properties
hello oldboy
123
789
(6)基于命令行参数启动一个消费者
[root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --bootstrap-server 172.200.1.101:9092 --topic oldboy --consumer-property group.id=oldboyedu
456
(7)查看当前的消费者组信息
[root@elk101.oldboyedu.com ~]# kafka-consumer-groups.sh --bootstrap-server 172.200.1.101:9092 --list
oldboyedu
test-consumer-group
[root@elk101.oldboyedu.com ~]#
温馨提示:
(1)上面的案例启动了1个生产者,2个消费者。但同一时刻只有一个消费者接收到生产者的消息,不可能2个消费者同时接收到生产者发送的消息哟~
(2)由于我们上面创建的topic是3个,接下来我们在启动3个消费者,加上上面的2个消费者,总共就是5个消费者,而在同一个消费者组中有5个消费者,而分区数仅有3个,最终会有2个消费者拿不到数据哟;
(3)当一个消费者新加入了新的消费者时,分区会重新均衡哟,也就说之前能接收数据的消费者,在重新均衡的时候可能会分配不到分区哟;
(4)关于上面的"--bootstrap-server"参数后面,
四.查看"__consumer_offsets"的topic内容实操案例
1.修改消费者的配置文件("consumer.properties"),可选操作
[root@elk101.oldboyedu.com ~]# egrep -v "^#|^$" /oldboy/softwares/kafka/config/consumer.properties # 个人觉得,如果有必要修改的化,可以只修改group.id即可,其它2个参数我发现不修改也可以!
bootstrap.servers=172.200.1.101:9092
group.id=test-consumer-group
exclude.internal.topics=false # 该参数网上很多人都信誓旦旦的说必须将其改为false,但通过我的实际测试,该参数我们设置为true和false都不会影响下面的实验。
[root@elk101.oldboyedu.com ~]#
温馨提示:
我们可以用正则来订阅topic,而__consumer_offsets是kafka server中一个默认的topic名,当用户有一个topic叫sumer_off,且用正则的方式来订阅topic,如下所示:
Pattern pattern = Pattern.compile(".*umer_of.*");
consumer.subscribe(pattern);
那么正则会匹配到__consumer_offsets这个内部topic,所以就用这个参数来限制是否暴露内部topic,当设置为true是,就算正则匹配中了内部topic,也不会消费,此时只能通过订阅的方式来消费内部topic,当这个参数设置为false时,如果正则匹配到内部topic,就会消费到内部topic的数据。
参考链接:
https://blog.csdn.net/vim_wj/article/details/78840989
2.查看消费者组信息并计算其在"__consumer_offsets"的topic上offset提交的分区数编号。
[root@elk101.oldboyedu.com ~]# kafka-consumer-groups.sh --bootstrap-server 172.200.1.101:9092 --list
console-consumer-85629
[root@elk101.oldboyedu.com ~]#
温馨提示:
(1)Kafka会使用下面公式计算消费者组位移保存在名为"__consumer_offsets"的topic的哪个分区上:
Math.abs(groupID.hashCode()) % numPartitions
(2)编写JAVA代码计算"console-consumer-85629"对应的分区数,默认情况下,"__consumer_offsets"有50个分区:
[root@elk101.oldboyedu.com ~/kafka]# vim get_partition.java
[root@elk101.oldboyedu.com ~/kafka]#
[root@elk101.oldboyedu.com ~/kafka]# cat get_partition.java # 注意当前的文件名称要和java代码中main方法的类名一致哟~
/*
* author:
* oldboy
* blog:
* http://www.cnblogs.com/oldboy
* EMAIL:
* y1053419035@qq.com
*
*/
public class get_partition {
public static void main(String[] args) throws Exception {
// 注意哈,"__consumer_offsets"的topic默认分区是50个哟~
System.out.println(Math.abs(args[0].hashCode()) % 50);
}
}
[root@elk101.oldboyedu.com ~/kafka]#
[root@elk101.oldboyedu.com ~/kafka]# ll
总用量 4
-rw-r--r-- 1 root root 359 5月 7 17:02 get_partition.java
[root@elk101.oldboyedu.com ~/kafka]#
[root@elk101.oldboyedu.com ~/kafka]# javac get_partition.java
[root@elk101.oldboyedu.com ~/kafka]#
[root@elk101.oldboyedu.com ~/kafka]# ll
总用量 8
-rw-r--r-- 1 root root 547 5月 7 17:04 get_partition.class
-rw-r--r-- 1 root root 359 5月 7 17:02 get_partition.java
[root@elk101.oldboyedu.com ~/kafka]#
[root@elk101.oldboyedu.com ~/kafka]# java get_partition console-consumer-51563 # 注意哈,该组名就是上面我们查询出来的哟~
31
[root@elk101.oldboyedu.com ~/kafka]#
[root@elk101.oldboyedu.com ~/kafka]# java get_partition "console-consumer-51563"
31
[root@elk101.oldboyedu.com ~/kafka]#
3.查看消费者组的偏移量信息
(1)基于配置文件查看消费者组的偏移量信息
[root@elk101.oldboyedu.com ~]# egrep -v "^#|^$" /oldboy/softwares/kafka/config/consumer.properties # 修改消费者的配置文件
bootstrap.servers=172.200.1.101:9092
group.id=test-consumer-group
exclude.internal.topics=false
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$Off
setsMessageFormatter" --consumer.config /oldboy/softwares/kafka/config/consumer.properties --from-beginning # 注意哈,下面的数据基本上每3秒刷新一次。
...
[test-consumer-group,__consumer_offsets,31]::OffsetAndMetadata(offset=708, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620378740504, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,28]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1620378878071, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,31]::OffsetAndMetadata(offset=760, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620378878071, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,28]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1620378883072, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,31]::OffsetAndMetadata(offset=810, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620378883072, expireTimestamp=None)
...
(2)基于"__consumer_offsets"的topic上的分区编号查看消费者组的偏移量信息
[root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --partition 11 --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetad
ataManager\$OffsetsMessageFormatter" --from-beginning # 注意哈,当我从分区为"11"的编号去获取数据发现获取的数据量为0条。
^CProcessed a total of 0 messages
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --partition 31 --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetad
ataManager\$OffsetsMessageFormatter" --from-beginning # 注意哈,当我从分区为"31"的编号去获取数据发现获取的数据为707条,说明数据的确是提交到31号分区啦!
...
[test-consumer-group,__consumer_offsets,8]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1620375184336, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,21]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620375184336, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,4]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620375184336, expireTimestamp=None)
...
^CProcessed a total of 707 messages
[root@elk101.oldboyedu.com ~]#
(3)"_consumer_offsets topic"的日志格式说明:
日志格式如下所示:(可以理解为"Key::Value"格式)
[Group, Topic, Partition]::OffsetAndMetadata(Offset, leaderEpoch, Metadata, commitTimestamp, expireTimestamp]
相关字段说明如下:
Group:
对应消费者组的groupid,这条消息要发送到"__consumer_offset"的哪个分区,是由这个字段决定的。
值得注意是,此处设置的是消费者组名称,而非消费者组内的某个消费者。这样设计的好处是,当一个消费者组内的消费者数量有所变动时会导致的重新rebalance。
而消费者组内的消费者重新分配到新的partition时,它们是知道该partition被消费到哪里的,因为在broker都有对应消费者组关于某个分区以消费的offset。
Topic:
主题名称。
Partition:
主题的分区编号。
OffsetAndMetadata:
偏移量和元数据信息。其包含以下五项内容:
offset:
偏移量信息。
leaderEpoch:
Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。它们可能引起的问题包括:
备份数据丢失
备份数据不一致
Kafka 0.11版本之后引入了leader epoch来取代HW值。Leader端多开辟一段内存区域专门保存leader的epoch信息,这样即使出现上面的两个场景也能很好地规避这些问题。
Metadata:
自定义元数据信息,通常情况下为空,因为这种场景很少会用到。
commitTimestamp:
提交到kafka的时间。
expireTimestamp:
过期时间, 当数据过期时会有一个定时任务去清理过期的消息。