Kafka 命令整合

Kafka 命令整合

一、Topic Command

1.0、Kafka 启停

启动
/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
停止
/opt/module/kafka/bin/kafka-server-stop.sh

1.1、Topic 创建

​​bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test​​

1.2、删除 Topic

​bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test​​

1.3、Topic 分区扩容

单个 Topic 扩容

​bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4​​

批量扩容 (将所有正则表达式匹配到的 Topic 分区扩容到 4 个)

​​sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server localhost:9092 --alter --partitions 4​​

1.4、查询 Topic 描述

1、查询单个 Topic

bin/kafka-topics.sh --topic test --bootstrap-server localhost:9092 --describe --exclude-internal​​

2、批量查询 Topic (正则表达式匹配,下面是查询所有 Topic)

​​sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server localhost:9092 --describe --exclude-internal​​

支持正则表达式匹配 Topic, 只需要将 topic 用双引号包裹起来。

1.5、查询 Topic 列表

1、查询所有 Topic 列表

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list --exclude-internal​​

2、查询匹配 Topic 列表 (正则表达式)

查询​​test_create_​​开头的所有 Topic 列表。

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list --exclude-internal --topic "user.*"

二、Config Command

2.1、查询配置

1、查询单个 Topic 配置 (只列举动态配置)

​sh bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test​​ 
或者
​​sh bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1:9092 --entity-type topics --entity-name test​​

2、查询所有 Topic 配置 (包括内部 Topic)(只列举动态配置)

​​sh bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1.85:9092 --entity-type topics​​

3、查询 Topic 的详细配置 (动态 + 静态)

只需要加上一个参数​​--all​​

其他配置 /clients/users/brokers/broker-loggers 的查询

同理 ;只需要将​​--entity-type​​ 改成对应的类型就行了 (topics/clients/users/brokers/broker-loggers)

查询 kafka 版本信息

bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1:9092 --version​​

2.2、增删改配置 ​​--alter​​

删除配置: ​​--delete-config​​ k1=v1,k2=v2
添加 / 修改配置: ​​--add-config​​ k1,k2
选择类型: ​​--entity-type​​ (topics/clients/users/brokers/broker-loggers)
类型名称: ​​--entity-name​​

Topic 添加 / 修改动态配置​​
--add-config​​

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999​​

Topic 删除动态配置
--delete-config​​

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms​​

添加 / 删除配置同时执行

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=788888888 --delete-config log.retention.ms​​

其他配置同理,只需要类型改下​​--entity-type​​

类型有: (topics/clients/users/brokers/broker- loggers)   

默认配置
配置默认​​--entity-default​​

sh bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=88888888​​

动态配置的默认配置是使用了节点 ​​​​;
优先级:指定动态配置 > 默认动态配置 > 静态配置

三、副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions

【kafka 运维】副本扩缩容、数据迁移、副本重分配、副本跨路径迁移​​
【​​kafka 实战】分区重分配可能出现的问题和排查问题思路

四、Topic 的发送 kafka-console-producer.sh

4.1、生产无 key 消息

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

4.2、生产有 key 消息

加上属性​​--property parse.key=true​​

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true

默认消息 key 与消息 value 间使用 “Tab 键” 进行分隔,所以消息 key 以及 value 中切勿使用转义字符 (\t)。

五、Topic 的消费 kafka-console-consumer.sh

5.1、新客户端从头消费​​--from-beginning​​ (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)

下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费:

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

5.2、正则表达式匹配 topic 进行消费​​--whitelist​​

​​消费所有的 topic​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’

​​消费所有的 topic,并且还从头消费​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning

5.3、显示 key 进行消费​​--property print.key=true​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true

5.4、指定分区消费​​--partition​​ 指定起始偏移量消费​​--offset​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5.5、给客户端命名​​--group​​

注意给客户端命名之后,如果之前有过消费,那么​​--from-beginning​​就不会再从头消费了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group

5.6、添加客户端属性​​--consumer-property​​

这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性​​--group test-group​​ 那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test ​​--consumer-property group.id=test-consumer-group​​

5.7、添加客户端属性​​--consumer.config​​

跟​​--consumer-property​​​ 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面,​​--consumer-property​​​ 的优先级大于 ​​--consumer.config​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties

六、kafka-leader-election Leader 重新选举

6.1、指定 Topic 指定分区用重新​​PREFERRED:优先副本策略​​ 进行 Leader 重选举

sh bin/kafka-leader-election.sh --bootstrap-server 127.0.0.1:9092 --topic test_create_topic4 --election-type PREFERRED --partition 0

6.2、所有 Topic 所有分区用重新​​PREFERRED:优先副本策略​​ 进行 Leader 重选举

sh bin/kafka-leader-election.sh --bootstrap-server 127.0.0.1:9092 --election-type preferred  --all-topic-partitions

6.3、设置配置文件批量指定 topic 和分区进行 Leader 重选举

先配置 leader-election.json 文件

{
  "partitions": [
    {
      "topic": "test_create_topic4",
      "partition": 1
    },
    {
      "topic": "test_create_topic4",
      "partition": 2
    }
  ]
}
sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred  --path-to-json-file config/leader-election.json

七、持续批量推送消息 kafka-verifiable-producer.sh

单次发送 100 条消息​​--max-messages 100​​
一共要推送多少条,默认为 - 1,-1 表示一直推送到进程关闭位置

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 ​​--max-messages 100​​

每秒发送最大吞吐量不超过消息 ​​--throughput 100​​
推送消息时的吞吐量,单位 messages/sec。默认为 - 1,表示没有限制

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 ​​--throughput 100​​

发送的消息体带前缀​​--value-prefix​​

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 ​​--value-prefix 666​​

注意​​--value-prefix 666​​​必须是整数,发送的消息体的格式是加上一个 点号​​.​​​ 例如: ​​666.​​
其他参数:

​​--producer.config CONFIG_FILE​​ 指定 producer 的配置文件
​​--acks ACKS​​ 每次推送消息的 ack 值,默认是 - 1

八、持续批量拉取消息 kafka-verifiable-consumer

持续消费

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4

单次最大消费 10 条消息​​--max-messages 10​​

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 ​​--max-messages 10​​

九、生产者压力测试 kafka-producer-perf-test.sh

9.1、发送 1024 条消息​​--num-records 100​​并且每条消息大小为 1KB​​--record-size 1024​​ 最大吞吐量每秒 10000 条​​--throughput 100​​

sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024

9.2、用指定消息文件​​--payload-file​​发送 100 条消息最大吞吐量每秒 100 条​​--throughput 100​​

先配置好消息文件​​batchmessage.txt​​
然后执行命令
发送的消息会从​​batchmessage.txt​​里面随机选择;注意这里我们没有用参数​​--payload-delimeter​​指定分隔符,默认分隔符是 \n 换行;

bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 1024 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt

十、消费者压力测试 kafka-consumer-perf-test.sh

消费 100 条消息​​--messages 100​​

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100

十一、删除指定分区的消息 kafka-delete-records.sh

删除指定 topic 的某个分区的消息删除至 offset 为 1024
先配置 json 文件​​offset-json-file.json​​

{"partitions":
[{"topic": "test1", "partition": 0,
  "offset": 1024}],
  "version":1
}

在执行命令

sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json

十二、查看 Broker 磁盘信息 kafka-log-dirs.sh

查询指定 topic 磁盘信息​​--topic-list topic1,topic2​​

sh bin/kafka-log-dirs.sh --bootstrap-server 127.0.0.1:9092 --describe --topic-list test2

查询指定 Broker 磁盘信息​​--broker-list 0 broker1,broker2​​

sh bin/kafka-log-dirs.sh --bootstrap-server 127.0.0.1:9092 --describe --topic-list test2 --broker-list 0

例如我一个 3 分区 3 副本的 Topic 的查出来的信息
​​logDir​​ Broker 中配置的​​log.dir​​

{
  "version": 1,
  "brokers": [{
    "broker": 0,
    "logDirs": [{
      "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
      "error": null,
      "partitions": [{
        "partition": "test2-1",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-0",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-2",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }]
    }]
  }, {
    "broker": 1,
    "logDirs": [{
      "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
      "error": null,
      "partitions": [{
        "partition": "test2-1",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-0",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-2",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }]
    }]
  }, {
    "broker": 2,
    "logDirs": [{
      "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
      "error": null,
      "partitions": [{
        "partition": "test2-1",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-0",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-2",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }]
    }]
  }, {
    "broker": 3,
    "logDirs": [{
      "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
      "error": null,
      "partitions": []
    }]
  }]
}

十三、消费者组管理 kafka-consumer-groups.sh

13.1、查看消费者列表​​--list​​

​sh bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list​​

先调用​​MetadataRequest​​拿到所有在线 Broker 列表
再给每个 Broker 发送​​ListGroupsRequest​​请求获取 消费者组数据

13.2、查看消费者组详情​​--describe​​

​​DescribeGroupsRequest​​
查看消费组详情​​--group​​ 或 ​​--all-groups​​
查看指定消费组详情​​--group​​​

​​sh bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group test2_consumer_group​​

查看所有消费组详情​​--all-groups​​​

​​sh bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups

查看该消费组 消费的所有 Topic、及所在分区、最新消费 offset、Log 最新数据 offset、Lag 还未消费数量、消费者 ID 等等信息。

查询消费者成员信息​​--members​​

​​sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090​

指定消费组成员信息

​sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server 127.0.0.1:9092​​

查询消费者状态信息​​--state​​

​​sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server 127.0.0.1:9092

指定消费组状态信息

​sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server 127.0.0.1:9092​​

13.3、删除消费者组​​--delete​​

​​DeleteGroupsRequest​​
删除消费组–delete
删除指定消费组​​--group​​​

​sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server 127.0.0.1:9092​​

删除所有消费组​​--all-groups​​​

sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server 127.0.0.1:9092​​

PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费 / 不在线才能够成功删除;否则会报下面异常

Error: Deletion of some consumer groups failed:

  • Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

13.4、重置消费组的偏移量 ​​--reset-offsets​​

能够执行成功的一个前提是 消费组这会是不可用状态;
下面的示例使用的参数是:​​--dry-run​​ ; 这个参数表示预执行,会打印出来将要处理的结果;
等你想真正执行的时候请换成参数​​--execute​​ ;
下面示例 重置模式都是 ​​--to-earliest​​ 重置到最早的;
请根据需要参考下面 相关重置 Offset 的模式 换成其他模式;
重置指定消费组的偏移量 ​​--group​​
重置指定消费组的所有 Topic 的偏移量​​--all-topic​​​

​​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server 127.0.0.1:9092 --dry-run --all-topic

​​重置指定消费组的指定 Topic 的偏移量​​--topic​​​

​​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server 127.0.0.1:9092 --dry-run --topic test2​​

重置所有消费组的偏移量 ​​--all-group​​
重置所有消费组的所有 Topic 的偏移量​​--all-topic​​​

​​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server 127.0.0.1:9092 --dry-run --all-topic

​​重置所有消费组中指定 Topic 的偏移量​​--topic​​​

​​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server 127.0.0.1:9092 --dry-run --topic test2​​

​​--reset-offsets​​ 后面需要接重置的模式

13.5、删除偏移量​​delete-offsets​​

能够执行成功的一个前提是 消费组这会是不可用状态;
偏移量被删除了之后,Consumer Group 下次启动的时候,会从头消费;

​sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server 127.0.0.1:9092 --topic test2​​

十四、查看日志文件 kafka-dump-log.sh

​​sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log​​

查询 Log 文件具体信息 ​​--print-data-log​​

​sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log --print-data-log​​

查询 index 文件具体信息

​sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.index​​

配置项为​​log.index.size.max.bytes​​; 来控制创建索引的大小;

查询 timeindex 文件

​​sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.timeindex​​

kafka 重新消费已消费的记录:
其中,<kafka_host > 和 < port > 是 Kafka 服务器的主机名和端口号,<group_name > 是消费者组的名称,<topic_name > 是要消费的主题名称,<offset_number > 是要重置的偏移量位置。

kafka-consumer-groups.sh --bootstrap-server <kafka_host>:<port> --group <group_name> --topic <topic_name> --reset-offsets --to-offset <offset_number> --execute

将 Kafka 消费者的偏移量设置到某个时间前:
其中,<kafka_host > 和 < port > 是 Kafka 服务器的主机名和端口号,<group_name > 是消费者组的名称,<topic_name > 是要消费的主题名称, 是要设置的时间点。请注意, 的格式应为 YYYY-MM-DDTHH:mm:ss.sss,例如 2022-01-01T00:00:00.000

kafka-consumer-groups.sh --bootstrap-server <kafka_host>:<port> --group <group_name> --topic <topic_name> --reset-offsets --to-datetime <datetime> --execute

参考:
https://blog.51cto.com/szzdzhp/5302196

评论

暂无

添加新评论