Kafka命令整合

Kafka命令整合

Kafka 启停

启动
/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
停止
/opt/module/kafka/bin/kafka-server-stop.sh

一、Topic Command

1.1、Topic创建

​​bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test​​

1.2、删除Topic

​bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test​​

1.3、Topic分区扩容

单个Topic扩容

​bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4​​

批量扩容 (将所有正则表达式匹配到的Topic分区扩容到4个)

​​sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server localhost:9092 --alter --partitions 4​​

1.4、查询Topic描述

1、查询单个Topic

bin/kafka-topics.sh --topic test --bootstrap-server localhost:9092 --describe --exclude-internal​​

2、批量查询Topic(正则表达式匹配,下面是查询所有Topic)

​​sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server localhost:9092 --describe --exclude-internal​​

支持正则表达式匹配Topic,只需要将topic 用双引号包裹起来。

1.5、查询Topic列表

1、查询所有Topic列表

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list --exclude-internal​​

2、查询匹配Topic列表(正则表达式)

查询​​test_create_​​开头的所有Topic列表。

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list --exclude-internal --topic "user.*"

二、Config Command

2.1、查询配置

1、查询单个Topic配置(只列举动态配置)

​sh bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test​​ 
或者
​​sh bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1:9092 --entity-type topics --entity-name test​​

2、查询所有Topic配置(包括内部Topic)(只列举动态配置)

​​sh bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1.85:9092 --entity-type topics​​

3、查询Topic的详细配置(动态+静态)

只需要加上一个参数​​--all​​

其他配置/clients/users/brokers/broker-loggers 的查询

同理 ;只需要将​​--entity-type​​ 改成对应的类型就行了 (topics/clients/users/brokers/broker-loggers)

查询kafka版本信息

bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1:9092 --version​​

2.2、增删改 配置 ​​--alter​​

–alter
删除配置: ​​--delete-config​​ k1=v1,k2=v2
添加/修改配置: ​​--add-config​​ k1,k2
选择类型: ​​--entity-type​​ (topics/clients/users/brokers/broker-loggers)
类型名称: ​​--entity-name​​

Topic添加/修改动态配置​​
--add-config​​

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999​​

Topic删除动态配置
​​--delete-config​​

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms​​

添加/删除配置同时执行

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=788888888 --delete-config log.retention.ms​​

其他配置同理,只需要类型改下​​--entity-type​​

类型有: (topics/clients/users/brokers/broker- loggers)   

默认配置
配置默认​​--entity-default​​

sh bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=88888888​​

动态配置的默认配置是使用了节点 ​​​​;
优先级:指定动态配置>默认动态配置>静态配置

三、副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions

【kafka运维】副本扩缩容、数据迁移、副本重分配、副本跨路径迁移​​
【​​kafka实战】分区重分配可能出现的问题和排查问题思路

四、Topic的发送kafka-console-producer.sh

4.1、生产无key消息

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

4.2、生产有key消息

加上属性​​--property parse.key=true​​

## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true

默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)。

五、Topic的消费kafka-console-consumer.sh

5.1、新客户端从头消费​​--from-beginning​​ (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)

下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费:

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

5.2、正则表达式匹配topic进行消费​​--whitelist​​

​​消费所有的topic​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’

​​消费所有的topic,并且还从头消费​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning

5.3、显示key进行消费​​--property print.key=true​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true

5.4、指定分区消费​​--partition​​ 指定起始偏移量消费​​--offset​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5.5、给客户端命名​​--group​​

注意给客户端命名之后,如果之前有过消费,那么​​--from-beginning​​就不会再从头消费了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group

5.6、添加客户端属性​​--consumer-property​​

这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性​​--group test-group​​ 那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test ​​--consumer-property group.id=test-consumer-group​​

5.7、添加客户端属性​​--consumer.config​​

跟​​--consumer-property​​​ 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, ​​--consumer-property​​​ 的优先级大于 ​​--consumer.config​​

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties

六、kafka-leader-election Leader重新选举

6.1、指定Topic指定分区用重新​​PREFERRED:优先副本策略​​ 进行Leader重选举

sh bin/kafka-leader-election.sh --bootstrap-server 127.0.0.1:9092 --topic test_create_topic4 --election-type PREFERRED --partition 0

6.2、所有Topic所有分区用重新​​PREFERRED:优先副本策略​​ 进行Leader重选举

sh bin/kafka-leader-election.sh --bootstrap-server 127.0.0.1:9092 --election-type preferred  --all-topic-partitions

6.3、设置配置文件批量指定topic和分区进行Leader重选举

先配置leader-election.json文件

{
  "partitions": [
    {
      "topic": "test_create_topic4",
      "partition": 1
    },
    {
      "topic": "test_create_topic4",
      "partition": 2
    }
  ]
}
sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred  --path-to-json-file config/leader-election.json

七、持续批量推送消息kafka-verifiable-producer.sh

单次发送100条消息​​--max-messages 100​​
一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 ​​--max-messages 100​​

每秒发送最大吞吐量不超过消息 ​​--throughput 100​​
推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 ​​--throughput 100​​

发送的消息体带前缀​​--value-prefix​​

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 ​​--value-prefix 666​​

注意​​--value-prefix 666​​​必须是整数,发送的消息体的格式是加上一个 点号​​.​​​ 例如: ​​666.​​
其他参数:

​​--producer.config CONFIG_FILE​​ 指定producer的配置文件
​​--acks ACKS​​ 每次推送消息的ack值,默认是-1

八、持续批量拉取消息kafka-verifiable-consumer

持续消费

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4

单次最大消费10条消息​​--max-messages 10​​

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 ​​--max-messages 10​​

九、生产者压力测试kafka-producer-perf-test.sh

9.1、发送1024条消息​​--num-records 100​​并且每条消息大小为1KB​​--record-size 1024​​ 最大吞吐量每秒10000条​​--throughput 100​​

sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024

9.2、用指定消息文件​​--payload-file​​发送100条消息最大吞吐量每秒100条​​--throughput 100​​

先配置好消息文件​​batchmessage.txt​​
然后执行命令
发送的消息会从​​batchmessage.txt​​里面随机选择; 注意这里我们没有用参数​​--payload-delimeter​​指定分隔符,默认分隔符是\n换行;

bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 1024 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt

十、消费者压力测试kafka-consumer-perf-test.sh

消费100条消息​​--messages 100​​

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100

十一、删除指定分区的消息kafka-delete-records.sh

删除指定topic的某个分区的消息删除至offset为1024
先配置json文件​​offset-json-file.json​​

{"partitions":
[{"topic": "test1", "partition": 0,
  "offset": 1024}],
  "version":1
}

在执行命令

sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json

十二、查看Broker磁盘信息kafka-log-dirs.sh

查询指定topic磁盘信息​​--topic-list topic1,topic2​​

sh bin/kafka-log-dirs.sh --bootstrap-server 127.0.0.1:9092 --describe --topic-list test2

查询指定Broker磁盘信息​​--broker-list 0 broker1,broker2​​

sh bin/kafka-log-dirs.sh --bootstrap-server 127.0.0.1:9092 --describe --topic-list test2 --broker-list 0

例如我一个3分区3副本的Topic的查出来的信息
​​logDir​​ Broker中配置的​​log.dir​​

{
  "version": 1,
  "brokers": [{
    "broker": 0,
    "logDirs": [{
      "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
      "error": null,
      "partitions": [{
        "partition": "test2-1",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-0",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-2",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }]
    }]
  }, {
    "broker": 1,
    "logDirs": [{
      "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
      "error": null,
      "partitions": [{
        "partition": "test2-1",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-0",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-2",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }]
    }]
  }, {
    "broker": 2,
    "logDirs": [{
      "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
      "error": null,
      "partitions": [{
        "partition": "test2-1",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-0",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }, {
        "partition": "test2-2",
        "size": 0,
        "offsetLag": 0,
        "isFuture": false
      }]
    }]
  }, {
    "broker": 3,
    "logDirs": [{
      "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
      "error": null,
      "partitions": []
    }]
  }]
}

十三、消费者组管理 kafka-consumer-groups.sh

13.1、查看消费者列表​​--list​​

​sh bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list​​

先调用​​MetadataRequest​​拿到所有在线Broker列表
再给每个Broker发送​​ListGroupsRequest​​请求获取 消费者组数据

13.2、查看消费者组详情​​--describe​​

​​DescribeGroupsRequest​​
查看消费组详情​​--group​​ 或 ​​--all-groups​​
查看指定消费组详情​​--group​​​

​​sh bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group test2_consumer_group​​

查看所有消费组详情​​--all-groups​​​

​​sh bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups

查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息。

查询消费者成员信息​​--members​​

​​sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090​

指定消费组成员信息

​sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server 127.0.0.1:9092​​

查询消费者状态信息​​--state​​

​​sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server 127.0.0.1:9092

指定消费组状态信息

​sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server 127.0.0.1:9092​​

13.3、删除消费者组​​--delete​​

​​DeleteGroupsRequest​​
删除消费组–delete
删除指定消费组​​--group​​​

​sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server 127.0.0.1:9092​​

删除所有消费组​​--all-groups​​​

sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server 127.0.0.1:9092​​

PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常

Error: Deletion of some consumer groups failed:

  • Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

13.4、重置消费组的偏移量 ​​--reset-offsets​​

能够执行成功的一个前提是 消费组这会是不可用状态;
下面的示例使用的参数是:​​--dry-run​​ ;这个参数表示预执行,会打印出来将要处理的结果;
等你想真正执行的时候请换成参数​​--execute​​ ;
下面示例 重置模式都是 ​​--to-earliest​​ 重置到最早的;
请根据需要参考下面 相关重置Offset的模式 换成其他模式;
重置指定消费组的偏移量 ​​--group​​
重置指定消费组的所有Topic的偏移量​​--all-topic​​​

​​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server 127.0.0.1:9092 --dry-run --all-topic

​​重置指定消费组的指定Topic的偏移量​​--topic​​​

​​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server 127.0.0.1:9092 --dry-run --topic test2​​

重置所有消费组的偏移量 ​​--all-group​​
重置所有消费组的所有Topic的偏移量​​--all-topic​​​

​​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server 127.0.0.1:9092 --dry-run --all-topic

​​重置所有消费组中指定Topic的偏移量​​--topic​​​

​​sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server 127.0.0.1:9092 --dry-run --topic test2​​

​​--reset-offsets​​ 后面需要接重置的模式

13.5、删除偏移量​​delete-offsets​​

能够执行成功的一个前提是 消费组这会是不可用状态;
偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费;

​sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server 127.0.0.1:9092 --topic test2​​

十四、查看日志文件 kafka-dump-log.sh

​​sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log​​

查询Log文件具体信息 ​​--print-data-log​​

​sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log --print-data-log​​

查询index文件具体信息

​sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.index​​

配置项为​​log.index.size.max.bytes​​; 来控制创建索引的大小;

查询timeindex文件

​​sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.timeindex​​

kafka 重新消费已消费的记录:
其中,<kafka_host>和是Kafka服务器的主机名和端口号,<group_name>是消费者组的名称,<topic_name>是要消费的主题名称,<offset_number>是要重置的偏移量位置。

kafka-consumer-groups.sh --bootstrap-server <kafka_host>:<port> --group <group_name> --topic <topic_name> --reset-offsets --to-offset <offset_number> --execute

将Kafka消费者的偏移量设置到某个时间前:
其中,<kafka_host>和是Kafka服务器的主机名和端口号,<group_name>是消费者组的名称,<topic_name>是要消费的主题名称,是要设置的时间点。请注意,的格式应为YYYY-MM-DDTHH:mm:ss.sss,例如2022-01-01T00:00:00.000

kafka-consumer-groups.sh --bootstrap-server <kafka_host>:<port> --group <group_name> --topic <topic_name> --reset-offsets --to-datetime <datetime> --execute

参考:
https://blog.51cto.com/szzdzhp/5302196

评论

暂无

添加新评论