Kafka 启停
启动
/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
停止
/opt/module/kafka/bin/kafka-server-stop.sh
一、Topic Command
1.1、Topic创建
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
1.2、删除Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
1.3、Topic分区扩容
单个Topic扩容
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4
批量扩容 (将所有正则表达式匹配到的Topic分区扩容到4个)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server localhost:9092 --alter --partitions 4
1.4、查询Topic描述
1、查询单个Topic
bin/kafka-topics.sh --topic test --bootstrap-server localhost:9092 --describe --exclude-internal
2、批量查询Topic(正则表达式匹配,下面是查询所有Topic)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server localhost:9092 --describe --exclude-internal
支持正则表达式匹配Topic,只需要将topic 用双引号包裹起来。
1.5、查询Topic列表
1、查询所有Topic列表
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list --exclude-internal
2、查询匹配Topic列表(正则表达式)
查询test_create_开头的所有Topic列表。
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list --exclude-internal --topic "user.*"
二、Config Command
2.1、查询配置
1、查询单个Topic配置(只列举动态配置)
sh bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test
或者
sh bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1:9092 --entity-type topics --entity-name test
2、查询所有Topic配置(包括内部Topic)(只列举动态配置)
sh bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1.85:9092 --entity-type topics
3、查询Topic的详细配置(动态+静态)
只需要加上一个参数--all
其他配置/clients/users/brokers/broker-loggers 的查询
同理 ;只需要将--entity-type 改成对应的类型就行了 (topics/clients/users/brokers/broker-loggers)
查询kafka版本信息
bin/kafka-configs.sh --describe --bootstrap-server 127.0.0.1:9092 --version
2.2、增删改 配置 --alter
–alter
删除配置: --delete-config k1=v1,k2=v2
添加/修改配置: --add-config k1,k2
选择类型: --entity-type (topics/clients/users/brokers/broker-loggers)
类型名称: --entity-name
Topic添加/修改动态配置
--add-config
bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999
Topic删除动态配置
--delete-config
bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms
添加/删除配置同时执行
bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=788888888 --delete-config log.retention.ms
其他配置同理,只需要类型改下--entity-type
类型有: (topics/clients/users/brokers/broker- loggers)
默认配置
配置默认--entity-default
sh bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=88888888
动态配置的默认配置是使用了节点
优先级:指定动态配置>默认动态配置>静态配置
三、副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions
【kafka运维】副本扩缩容、数据迁移、副本重分配、副本跨路径迁移
【kafka实战】分区重分配可能出现的问题和排查问题思路
四、Topic的发送kafka-console-producer.sh
4.1、生产无key消息
## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties
4.2、生产有key消息
加上属性--property parse.key=true
## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true
默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)。
五、Topic的消费kafka-console-consumer.sh
5.1、新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)
下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费:
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
5.2、正则表达式匹配topic进行消费--whitelist
消费所有的topic
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’
消费所有的topic,并且还从头消费
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning
5.3、显示key进行消费--property print.key=true
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true
5.4、指定分区消费--partition 指定起始偏移量消费--offset
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100
5.5、给客户端命名--group
注意给客户端命名之后,如果之前有过消费,那么--from-beginning就不会再从头消费了
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group
5.6、添加客户端属性--consumer-property
这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group
5.7、添加客户端属性--consumer.config
跟--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties
六、kafka-leader-election Leader重新选举
6.1、指定Topic指定分区用重新PREFERRED:优先副本策略 进行Leader重选举
sh bin/kafka-leader-election.sh --bootstrap-server 127.0.0.1:9092 --topic test_create_topic4 --election-type PREFERRED --partition 0
6.2、所有Topic所有分区用重新PREFERRED:优先副本策略 进行Leader重选举
sh bin/kafka-leader-election.sh --bootstrap-server 127.0.0.1:9092 --election-type preferred --all-topic-partitions
6.3、设置配置文件批量指定topic和分区进行Leader重选举
先配置leader-election.json文件
{
"partitions": [
{
"topic": "test_create_topic4",
"partition": 1
},
{
"topic": "test_create_topic4",
"partition": 2
}
]
}
sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json
七、持续批量推送消息kafka-verifiable-producer.sh
单次发送100条消息--max-messages 100
一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100
每秒发送最大吞吐量不超过消息 --throughput 100
推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100
发送的消息体带前缀--value-prefix
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666
注意--value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号. 例如: 666.
其他参数:
--producer.config CONFIG_FILE 指定producer的配置文件
--acks ACKS 每次推送消息的ack值,默认是-1
八、持续批量拉取消息kafka-verifiable-consumer
持续消费
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4
单次最大消费10条消息--max-messages 10
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10
九、生产者压力测试kafka-producer-perf-test.sh
9.1、发送1024条消息--num-records 100并且每条消息大小为1KB--record-size 1024 最大吞吐量每秒10000条--throughput 100
sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024
9.2、用指定消息文件--payload-file发送100条消息最大吞吐量每秒100条--throughput 100
先配置好消息文件batchmessage.txt
然后执行命令
发送的消息会从batchmessage.txt里面随机选择; 注意这里我们没有用参数--payload-delimeter指定分隔符,默认分隔符是\n换行;
bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 1024 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt
十、消费者压力测试kafka-consumer-perf-test.sh
消费100条消息--messages 100
sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100
十一、删除指定分区的消息kafka-delete-records.sh
删除指定topic的某个分区的消息删除至offset为1024
先配置json文件offset-json-file.json
{"partitions":
[{"topic": "test1", "partition": 0,
"offset": 1024}],
"version":1
}
在执行命令
sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json
十二、查看Broker磁盘信息kafka-log-dirs.sh
查询指定topic磁盘信息--topic-list topic1,topic2
sh bin/kafka-log-dirs.sh --bootstrap-server 127.0.0.1:9092 --describe --topic-list test2
查询指定Broker磁盘信息--broker-list 0 broker1,broker2
sh bin/kafka-log-dirs.sh --bootstrap-server 127.0.0.1:9092 --describe --topic-list test2 --broker-list 0
例如我一个3分区3副本的Topic的查出来的信息
logDir Broker中配置的log.dir
{
"version": 1,
"brokers": [{
"broker": 0,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 1,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 2,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 3,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
"error": null,
"partitions": []
}]
}]
}
十三、消费者组管理 kafka-consumer-groups.sh
13.1、查看消费者列表--list
sh bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
先调用MetadataRequest拿到所有在线Broker列表
再给每个Broker发送ListGroupsRequest请求获取 消费者组数据
13.2、查看消费者组详情--describe
DescribeGroupsRequest
查看消费组详情--group 或 --all-groups
查看指定消费组详情--group
sh bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group test2_consumer_group
查看所有消费组详情--all-groups
sh bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups
查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息。
查询消费者成员信息--members
sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090
指定消费组成员信息
sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server 127.0.0.1:9092
查询消费者状态信息--state
sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server 127.0.0.1:9092
指定消费组状态信息
sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server 127.0.0.1:9092
13.3、删除消费者组--delete
DeleteGroupsRequest
删除消费组–delete
删除指定消费组--group
sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server 127.0.0.1:9092
删除所有消费组--all-groups
sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server 127.0.0.1:9092
PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常
Error: Deletion of some consumer groups failed:
- Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
13.4、重置消费组的偏移量 --reset-offsets
能够执行成功的一个前提是 消费组这会是不可用状态;
下面的示例使用的参数是:--dry-run
;这个参数表示预执行,会打印出来将要处理的结果;
等你想真正执行的时候请换成参数--execute
;
下面示例 重置模式都是 --to-earliest 重置到最早的;
请根据需要参考下面 相关重置Offset的模式 换成其他模式;
重置指定消费组的偏移量 --group
重置指定消费组的所有Topic的偏移量--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server 127.0.0.1:9092 --dry-run --all-topic
重置指定消费组的指定Topic的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server 127.0.0.1:9092 --dry-run --topic test2
重置所有消费组的偏移量 --all-group
重置所有消费组的所有Topic的偏移量--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server 127.0.0.1:9092 --dry-run --all-topic
重置所有消费组中指定Topic的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server 127.0.0.1:9092 --dry-run --topic test2
--reset-offsets
后面需要接重置的模式
13.5、删除偏移量delete-offsets
能够执行成功的一个前提是 消费组这会是不可用状态;
偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费;
sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server 127.0.0.1:9092 --topic test2
十四、查看日志文件 kafka-dump-log.sh
sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log
查询Log文件具体信息 --print-data-log
sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log --print-data-log
查询index文件具体信息
sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.index
配置项为log.index.size.max.bytes; 来控制创建索引的大小;
查询timeindex文件
sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.timeindex
kafka 重新消费已消费的记录:
其中,<kafka_host>和
kafka-consumer-groups.sh --bootstrap-server <kafka_host>:<port> --group <group_name> --topic <topic_name> --reset-offsets --to-offset <offset_number> --execute
将Kafka消费者的偏移量设置到某个时间前:
其中,<kafka_host>和
kafka-consumer-groups.sh --bootstrap-server <kafka_host>:<port> --group <group_name> --topic <topic_name> --reset-offsets --to-datetime <datetime> --execute
评论