一、脚本的使用介绍
该脚本是 kafka 提供用来重新分配分区的脚本工具;
1.1、生成推荐配置脚本
关键参数 --generate
。
在进行分区副本重分配之前,最好是用下面方式获取一个合理的分配文件;
编写 move-json-file.json
文件;这个文件就是告知想对哪些 Topic 进行重新分配的计算。
{
"topics": [
{"topic": "test_create_topic1"}
],
"version": 1
}
然后执行下面的脚本,--broker-list "0,1,2,3"
这个参数是你想要分配的 Brokers。
sh bin/kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list "0,1,2,3" --generate
执行完毕之后会打印:
Current partition replica assignment// 当前副本分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[2],"log_dirs":["any"]}]}
Proposed partition reassignment configuration// 期望的重新分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}
需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它。
1.2. 执行 Json 文件
关键参数 --execute
。
将上面得到期望的重新分配方式文件保存在一个 json 文件里面 reassignment-json-file.json
。
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}
然后执行
sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute
迁移过程注意流量陡增对集群的影响:
Kafka 提供一个 broker 之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限,当重新平衡集群,引导新 broker,添加或移除 broker 时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响、
例如上面的迁移操作加一个限流选项 --throttle 50000000
。
在后面加上一个 --throttle 50000000
参数,那么执行移动分区的时候,会被限制流量在 50000000 B/s
加上参数后可以看到
The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.
需要注意的是,如果你迁移的时候包含副本跨路径迁移 (同一个 Broker 多个路径) 那么这个限流措施不会生效,需要再加上 --replica-alter-log-dirs-throttle
这个限流参数,它限制的是同一个 Broker 不同路径直接迁移的限流。
如果想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。可以重新运行 execute 命令,用相同的 reassignment-json-file
。
在重新平衡期间,管理员可以使用指标监控复制是否正在进行:
kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
在复制过程中,滞后应该不断减少。如果指标没有减少,管理员应该如上所述增加节流吞吐量。
1.3、验证
关键参数 --verify
。
该选项用于检查分区重新分配的状态,同时 --throttle
流量限制也会被移除掉;否则可能会导致定期复制操作的流量也受到限制。
sh bin/kafka-reassign-partitions.sh --zookeeper xxxx:2181 --reassignment-json-file config/reassignment-json-file.json --verify
二、副本扩缩
kafka 并没有提供一个专门的脚本来支持副本的扩缩,不像 kafka-topic.sh
脚本一样,是可以扩分区的;想要对副本进行扩缩,只能是曲线救国了,利用 kafka-reassign-partitions.sh
来重新分配副本。
2.1、副本扩容
假设我们当前的情况是 3 分区 1 副本,为了提供可用性,我想把副本数升到 2。
2.1.1、计算副本分配方式
用步骤 1.1 的 --generate
获取一下当前的分配情况,得到如下 json。
{
"version": 1,
"partitions": [{
"topic": "test_create_topic1",
"partition": 2,
"replicas": [2],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 1,
"replicas": [1],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 0,
"replicas": [0],
"log_dirs": ["any"]
}]
}
我们想把所有分区的副本都变成 2, 那我们只需修改 "replicas": []
里面的值了,这里面是 Broker 列表,排在第一个的是 Leader,所以我们根据自己想要的分配规则修改一下 json 文件就变成如下。
{
"version": 1,
"partitions": [{
"topic": "test_create_topic1",
"partition": 2,
"replicas": [2,0],
"log_dirs": ["any","any"]
}, {
"topic": "test_create_topic1",
"partition": 1,
"replicas": [1,2],
"log_dirs": ["any","any"]
}, {
"topic": "test_create_topic1",
"partition": 0,
"replicas": [0,1],
"log_dirs": ["any","any"]
}]
}
注意 log_dirs
里面的数量要和 replicas
数量匹配;或者直接把 log_dirs
选项删除掉;这个 log_dirs
是副本跨路径迁移时候的绝对路径。
2.1.2、执行–execute
如果想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。可以重新运行 execute 命令,用相同的 reassignment-json-file
。
2.1.3、验证–verify
副本数量增加了。
2.2、副本缩容
副本缩容跟扩容是一个意思;当副本分配少于之前的数量时候,多出来的副本会被删除。
比如刚刚新增了一个副本,想重新恢复到一个副本。
{
"version": 1,
"partitions": [{
"topic": "test_create_topic1",
"partition": 2,
"replicas": [2],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 1,
"replicas": [1],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 0,
"replicas": [0],
"log_dirs": ["any"]
}]
}
执行之后可以看到其他的副本就被标记为删除了。一会就会被清理掉。
用这样一种方式我们虽然是实现了副本的扩缩容,但是副本的分配需要我们自己来把控好,要做到负载均衡等等;那肯定是没有 kafka 自动帮我们分配比较合理一点。
--generate
本质上也是调用了这个方法:
dminUtils.assignReplicasToBrokers (brokerMetadatas, assignment.size, replicas.size)
三、分区扩容
kafka 的分区扩容是 kafka-topis.sh
脚本实现的,不支持缩容。
四、分区迁移
分区迁移跟上面同理,请看 1.1,1.2,1.3 部分。
五、副本跨路径迁移
为什么线上 Kafka 机器各个磁盘间的占用不均匀,经常出现 “一边倒” 的情形? 这是因为 Kafka 只保证分区数量在各个磁盘上均匀分布,但它无法知晓每个分区实际占用空间,故很有可能出现某些分区消息数量巨大导致占用大量磁盘空间的情况。在 1.1 版本之前,用户对此毫无办法,因为 1.1 之前 Kafka 只支持分区数据在不同 broker 间的重分配,而无法做到在同一个 broker 下的不同磁盘间做重分配。1.1 版本正式支持副本在不同路径间的迁移。
怎么在一台 Broker 上用多个路径存放分区呢?
############################# Log Basics ##############################
A comma separated list of directories under which to store log fileslog.dirs=kafka-logs-5,kafka-logs-6,kafka-logs-7,kafka-logs-8
注意同一个 Broker 上不同路径只会存放不同的分区,而不会将副本存放在同一个 Broker; 不然那副本就没有意义了 (容灾)
怎么针对跨路径迁移呢?
迁移的 json 文件有一个参数是 log_dirs; 默认请求不传的话 它是 "log_dirs": ["any"]
(这个数组的数量要跟副本保持一致)
但是你想实现跨路径迁移,只需要在这里填入绝对路径就行了,例如下面。
迁移的 json 文件示例:
{
"version": 1,
"partitions": [
{
"topic": "test_create_topic4",
"partition": 2,
"replicas": [
0
],
"log_dirs": [
"/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-5"
]
},
{
"topic": "test_create_topic4",
"partition": 1,
"replicas": [
0
],
"log_dirs": [
"/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-6"
]
}
]
}
然后执行脚本:
sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx --reassignment-json-file config/reassignment-json-file.json --execute --bootstrap-serverxxxxx:9092 --replica-alter-log-dirs-throttle 10000
注意 --bootstrap-server
在跨路径迁移的情况下,必须传入此参数。
如果需要限流的话 加上参数 --replica-alter-log-dirs-throttle
; 跟 --throttle
不一样的是 --replica-alter-log-dirs-throttle
限制的是 Broker 内不同路径的迁移流量。
评论