本文最后更新于 393 天前,其中的信息可能已经过时,如有错误请发送邮件到 wuxianglongblog@163.com
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| |
| [root@elk101.oldboyedu.com ~] |
| |
| 温馨提示: |
| (1)我们可以基于"--zookeeper"指令去zookeeper查询现有的topic信息; |
| (2)我们也可以基于"--bootstrap-server"指令去kafka broker查询现有的topic信息,官方推荐使用"--bootstrap-server"的方式去管理topic。 |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| Created topic myNginx. |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| myNginx |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| myNginx |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| Created topic demo2021. |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| Error while executing topic command : Replication factor: 5 larger than available brokers: 3. |
| [2021-05-07 15:03:59,850] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 5 larger than available brokers: 3. |
| (kafka.admin.TopicCommand$) |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| demo2021 |
| myNginx |
| [root@elk101.oldboyedu.com ~] |
| |
| 温馨提示: |
| --topic: |
| 指定topic的名称。 |
| --partitions: |
| 指定分区数。 |
| --replication-factor: |
| 指定副本数。 |
| [root@elk101.oldboyedu.com ~] |
| Topic: myNginx TopicId: yq-BQW6ERHueoOdeYgfoKQ PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824 |
| Topic: myNginx Partition: 0 Leader: 101 Replicas: 101,103 Isr: 101,103 |
| Topic: myNginx Partition: 1 Leader: 102 Replicas: 102,101 Isr: 102,101 |
| Topic: myNginx Partition: 2 Leader: 103 Replicas: 103,102 Isr: 103,102 |
| [root@elk101.oldboyedu.com ~] |
| |
| 温馨提示: |
| 如果不使用"--topic"指定topic的名称,则默认查看所有的topic信息哟。 |
| (1)修改topic的分区数量(分区数只能扩容,无法缩容!) |
| [root@elk101.oldboyedu.com ~] |
| Topic: myNginx TopicId: yq-BQW6ERHueoOdeYgfoKQ PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824 |
| Topic: myNginx Partition: 0 Leader: 101 Replicas: 101,103 Isr: 101,103 |
| Topic: myNginx Partition: 1 Leader: 102 Replicas: 102,101 Isr: 102,101 |
| Topic: myNginx Partition: 2 Leader: 103 Replicas: 103,102 Isr: 103,102 |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| Topic: myNginx TopicId: yq-BQW6ERHueoOdeYgfoKQ PartitionCount: 8 ReplicationFactor: 2 Configs: segment.bytes=1073741824 |
| Topic: myNginx Partition: 0 Leader: 101 Replicas: 101,103 Isr: 101,103 |
| Topic: myNginx Partition: 1 Leader: 102 Replicas: 102,101 Isr: 102,101 |
| Topic: myNginx Partition: 2 Leader: 103 Replicas: 103,102 Isr: 103,102 |
| Topic: myNginx Partition: 3 Leader: 101 Replicas: 101,103 Isr: 101,103 |
| Topic: myNginx Partition: 4 Leader: 102 Replicas: 102,101 Isr: 102,101 |
| Topic: myNginx Partition: 5 Leader: 103 Replicas: 103,102 Isr: 103,102 |
| Topic: myNginx Partition: 6 Leader: 101 Replicas: 101,102 Isr: 101,102 |
| Topic: myNginx Partition: 7 Leader: 102 Replicas: 102,103 Isr: 102,103 |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| Error while executing topic command : Topic currently has 8 partitions, which is higher than the requested 5. |
| [2021-05-07 14:53:45,130] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 8 partitions, which is higher than the requested 5. |
| (kafka.admin.TopicCommand$) |
| [root@elk101.oldboyedu.com ~] |
| |
| (2)修改topic的副本数量(副本数只能小于等于集群的数量) |
| 副本数修改是比较麻烦的,需要我们手动修改json文件,详情可参考我之前的笔记: https://www.cnblogs.com/oldboy/p/9808125.html |
| [root@elk101.oldboyedu.com ~] |
| myNginx |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| |
| [root@elk101.oldboyedu.com ~] |
| |
| 温馨提示: |
| 如下图所示,当我们删除topic后,数据并不会立即被删除,而是会间隔一段时间后才会被真正的删除,这个间隔时间大约在3-5分钟左右(我没有做秒表计算,只是做了一个大概估计,在演示的时候可以做一个记时!)。 |
| [root@elk101.oldboyedu.com ~]# kafka-console-producer.sh --topic myNginx --broker-list 172.200.1.101:9092 |
| >hello world! |
| > |
| |
| 温馨提示: |
| 生产者创建成功后,我们可以就手动自定义写入测试数据,而后开启一个消费者进行数据消费,如果能正常获取数据,则说明集群启动是正常运行的。 |
| [root@elk101.oldboyedu.com ~] |
| hello world! |
| |
| 温馨提示: |
| 我们可以不使用"--from-beginning"参数,则有可能生产者要比消费者先启动一段时间,而且也将数据写入到kafka集群中了,但如果我们不想要在消费者启动时之前的数据可以不加该参数,如果使用该参数则表示从头进行消费。 |
| [root@elk101.oldboyedu.com ~] |
| console-consumer-43247 |
| console-consumer-52021 |
| test-consumer-group |
| [root@elk101.oldboyedu.com ~] |
| (1)创建"oldboy"的topic: |
| [root@elk101.oldboyedu.com ~] |
| Created topic oldboy. |
| [root@elk101.oldboyedu.com ~] |
| |
| (2)修改消费者的配置文件 |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| group.id=oldboyedu |
| [root@elk101.oldboyedu.com ~] |
| |
| (3)单独开启一个终端查看"__consumer_offsets"的topic内容: |
| [root@elk101.oldboyedu.com ~] |
| etsMessageFormatter" --consumer.config /oldboy/softwares/kafka/config/consumer.properties --from-beginning |
| |
| (4)创建生产者 |
| [root@elk101.oldboyedu.com ~]# kafka-console-producer.sh --topic oldboy --broker-list 172.200.1.101:9092 |
| >hello oldboy |
| >123 |
| >456 |
| >789 |
| > |
| |
| (5)基于消费者的配置文件启动一个消费者 |
| [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --bootstrap-server 172.200.1.101:9092 --topic oldboy --consumer.config /oldboy/softwares/kafka/config/consumer.properties |
| hello oldboy |
| 123 |
| 789 |
| |
| (6)基于命令行参数启动一个消费者 |
| [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --bootstrap-server 172.200.1.101:9092 --topic oldboy --consumer-property group.id=oldboyedu |
| 456 |
| |
| (7)查看当前的消费者组信息 |
| [root@elk101.oldboyedu.com ~]# kafka-consumer-groups.sh --bootstrap-server 172.200.1.101:9092 --list |
| oldboyedu |
| test-consumer-group |
| [root@elk101.oldboyedu.com ~]# |
| |
| 温馨提示: |
| (1)上面的案例启动了1个生产者,2个消费者。但同一时刻只有一个消费者接收到生产者的消息,不可能2个消费者同时接收到生产者发送的消息哟~ |
| (2)由于我们上面创建的topic是3个,接下来我们在启动3个消费者,加上上面的2个消费者,总共就是5个消费者,而在同一个消费者组中有5个消费者,而分区数仅有3个,最终会有2个消费者拿不到数据哟; |
| (3)当一个消费者新加入了新的消费者时,分区会重新均衡哟,也就说之前能接收数据的消费者,在重新均衡的时候可能会分配不到分区哟; |
| (4)关于上面的"--bootstrap-server"参数后面, |
| |
| [root@elk101.oldboyedu.com ~]# egrep -v "^#|^$" /oldboy/softwares/kafka/config/consumer.properties # 个人觉得,如果有必要修改的化,可以只修改group.id即可,其它2个参数我发现不修改也可以! |
| bootstrap.servers=172.200.1.101:9092 |
| group.id=test-consumer-group |
| exclude.internal.topics=false # 该参数网上很多人都信誓旦旦的说必须将其改为false,但通过我的实际测试,该参数我们设置为true和false都不会影响下面的实验。 |
| [root@elk101.oldboyedu.com ~]# |
| |
| 温馨提示: |
| 我们可以用正则来订阅topic,而__consumer_offsets是kafka server中一个默认的topic名,当用户有一个topic叫sumer_off,且用正则的方式来订阅topic,如下所示: |
| Pattern pattern = Pattern.compile(".*umer_of.*"); |
| consumer.subscribe(pattern); |
| 那么正则会匹配到__consumer_offsets这个内部topic,所以就用这个参数来限制是否暴露内部topic,当设置为true是,就算正则匹配中了内部topic,也不会消费,此时只能通过订阅的方式来消费内部topic,当这个参数设置为false时,如果正则匹配到内部topic,就会消费到内部topic的数据。 |
| |
| 参考链接: |
| https: |
| [root@elk101.oldboyedu.com ~]# kafka-consumer-groups.sh --bootstrap-server 172.200.1.101:9092 --list |
| console-consumer-85629 |
| [root@elk101.oldboyedu.com ~]# |
| |
| 温馨提示: |
| (1)Kafka会使用下面公式计算消费者组位移保存在名为"__consumer_offsets"的topic的哪个分区上: |
| Math.abs(groupID.hashCode()) % numPartitions |
| |
| (2)编写JAVA代码计算"console-consumer-85629"对应的分区数,默认情况下,"__consumer_offsets"有50个分区: |
| [root@elk101.oldboyedu.com ~/kafka]# vim get_partition.java |
| [root@elk101.oldboyedu.com ~/kafka]# |
| [root@elk101.oldboyedu.com ~/kafka]# cat get_partition.java # 注意当前的文件名称要和java代码中main方法的类名一致哟~ |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| public class get_partition { |
| public static void main(String[] args) throws Exception { |
| |
| System.out.println(Math.abs(args[0].hashCode()) % 50); |
| } |
| } |
| |
| [root@elk101.oldboyedu.com ~/kafka]# |
| [root@elk101.oldboyedu.com ~/kafka]# ll |
| 总用量 4 |
| -rw-r--r-- 1 root root 359 5月 7 17:02 get_partition.java |
| [root@elk101.oldboyedu.com ~/kafka]# |
| [root@elk101.oldboyedu.com ~/kafka]# javac get_partition.java |
| [root@elk101.oldboyedu.com ~/kafka]# |
| [root@elk101.oldboyedu.com ~/kafka]# ll |
| 总用量 8 |
| -rw-r--r-- 1 root root 547 5月 7 17:04 get_partition.class |
| -rw-r--r-- 1 root root 359 5月 7 17:02 get_partition.java |
| [root@elk101.oldboyedu.com ~/kafka]# |
| [root@elk101.oldboyedu.com ~/kafka]# java get_partition console-consumer-51563 # 注意哈,该组名就是上面我们查询出来的哟~ |
| 31 |
| [root@elk101.oldboyedu.com ~/kafka]# |
| [root@elk101.oldboyedu.com ~/kafka]# java get_partition "console-consumer-51563" |
| 31 |
| [root@elk101.oldboyedu.com ~/kafka]# |
| (1)基于配置文件查看消费者组的偏移量信息 |
| [root@elk101.oldboyedu.com ~] |
| bootstrap.servers=172.200.1.101:9092 |
| group.id=test-consumer-group |
| exclude.internal.topics=false |
| [root@elk101.oldboyedu.com ~] |
| [root@elk101.oldboyedu.com ~] |
| setsMessageFormatter" --consumer.config /oldboy/softwares/kafka/config/consumer.properties --from-beginning # 注意哈,下面的数据基本上每3秒刷新一次。 |
| |
| ... |
| |
| [test-consumer-group,__consumer_offsets,31]::OffsetAndMetadata(offset=708, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620378740504, expireTimestamp=None) |
| [test-consumer-group,__consumer_offsets,28]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1620378878071, expireTimestamp=None) |
| [test-consumer-group,__consumer_offsets,31]::OffsetAndMetadata(offset=760, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620378878071, expireTimestamp=None) |
| [test-consumer-group,__consumer_offsets,28]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1620378883072, expireTimestamp=None) |
| [test-consumer-group,__consumer_offsets,31]::OffsetAndMetadata(offset=810, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620378883072, expireTimestamp=None) |
| |
| ... |
| |
| (2)基于"__consumer_offsets"的topic上的分区编号查看消费者组的偏移量信息 |
| [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --partition 11 --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetad |
| ataManager\$OffsetsMessageFormatter" --from-beginning # 注意哈,当我从分区为"11"的编号去获取数据发现获取的数据量为0条。 |
| ^CProcessed a total of 0 messages |
| [root@elk101.oldboyedu.com ~]# |
| [root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --topic __consumer_offsets --partition 31 --bootstrap-server 172.200.1.101:9092 --formatter "kafka.coordinator.group.GroupMetad |
| ataManager\$OffsetsMessageFormatter" --from-beginning # 注意哈,当我从分区为"31"的编号去获取数据发现获取的数据为707条,说明数据的确是提交到31号分区啦! |
| |
| ... |
| |
| [test-consumer-group,__consumer_offsets,8]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1620375184336, expireTimestamp=None) |
| [test-consumer-group,__consumer_offsets,21]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620375184336, expireTimestamp=None) |
| [test-consumer-group,__consumer_offsets,4]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1620375184336, expireTimestamp=None) |
| |
| ... |
| |
| ^CProcessed a total of 707 messages |
| [root@elk101.oldboyedu.com ~]# |
| |
| (3)"_consumer_offsets topic"的日志格式说明: |
| 日志格式如下所示:(可以理解为"Key::Value"格式) |
| [Group, Topic, Partition]::OffsetAndMetadata(Offset, leaderEpoch, Metadata, commitTimestamp, expireTimestamp] |
| 相关字段说明如下: |
| Group: |
| 对应消费者组的groupid,这条消息要发送到"__consumer_offset"的哪个分区,是由这个字段决定的。 |
| 值得注意是,此处设置的是消费者组名称,而非消费者组内的某个消费者。这样设计的好处是,当一个消费者组内的消费者数量有所变动时会导致的重新rebalance。 |
| 而消费者组内的消费者重新分配到新的partition时,它们是知道该partition被消费到哪里的,因为在broker都有对应消费者组关于某个分区以消费的offset。 |
| Topic: |
| 主题名称。 |
| Partition: |
| 主题的分区编号。 |
| OffsetAndMetadata: |
| 偏移量和元数据信息。其包含以下五项内容: |
| offset: |
| 偏移量信息。 |
| leaderEpoch: |
| Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。它们可能引起的问题包括: |
| 备份数据丢失 |
| 备份数据不一致 |
| Kafka 0.11版本之后引入了leader epoch来取代HW值。Leader端多开辟一段内存区域专门保存leader的epoch信息,这样即使出现上面的两个场景也能很好地规避这些问题。 |
| Metadata: |
| 自定义元数据信息,通常情况下为空,因为这种场景很少会用到。 |
| commitTimestamp: |
| 提交到kafka的时间。 |
| expireTimestamp: |
| 过期时间, 当数据过期时会有一个定时任务去清理过期的消息。 |