一、脚本的使用介绍
该脚本是kafka提供用来重新分配分区的脚本工具;
1.1、生成推荐配置脚本
关键参数--generate
在进行分区副本重分配之前,最好是用下面方式获取一个合理的分配文件;
编写move-json-file.json
文件; 这个文件就是告知想对哪些Topic进行重新分配的计算。
{
"topics": [
{"topic": "test_create_topic1"}
],
"version": 1
}
然后执行下面的脚本,--broker-list "0,1,2,3" 这个参数是你想要分配的Brokers;
sh bin/kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list "0,1,2,3" --generate
执行完毕之后会打印:
Current partition replica assignment//当前副本分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[2],"log_dirs":["any"]}]}
Proposed partition reassignment configuration//期望的重新分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}
需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它。
1.2. 执行Json文件
关键参数--execute
将上面得到期望的重新分配方式文件保存在一个json文件里面reassignment-json-file.json
。
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}
然后执行
sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute
迁移过程注意流量陡增对集群的影响
Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限,当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响、
例如我们上面的迁移操作加一个限流选项--throttle 50000000
。
在后面加上一个 --throttle 50000000
参数, 那么执行移动分区的时候,会被限制流量在50000000 B/s
加上参数后你可以看到
The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.
需要注意的是,如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效,你需要再加上--replica-alter-log-dirs-throttle
这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流;
如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file
在重新平衡期间,管理员可以使用指标监控复制是否正在进行:
kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
在复制过程中,滞后应该不断减少。如果指标没有减少,管理员应该如上所述增加节流吞吐量。
1.3、验证
关键参数--verify
该选项用于检查分区重新分配的状态,同时—throttle流量限制也会被移除掉; 否则可能会导致定期复制操作的流量也受到限制。
sh bin/kafka-reassign-partitions.sh --zookeeper xxxx:2181 --reassignment-json-file config/reassignment-json-file.json --verify
二、副本扩缩
kafka并没有提供一个专门的脚本来支持副本的扩缩, 不像kafka-topic.sh脚本一样,是可以扩分区的; 想要对副本进行扩缩,只能是曲线救国了; 利用kafka-reassign-partitions.sh
来重新分配副本。
2.1、副本扩容
假设我们当前的情况是 3分区1副本,为了提供可用性,我想把副本数升到2;
2.1.1、计算副本分配方式
我们用步骤1.1的 --generate
获取一下当前的分配情况,得到如下json。
{
"version": 1,
"partitions": [{
"topic": "test_create_topic1",
"partition": 2,
"replicas": [2],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 1,
"replicas": [1],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 0,
"replicas": [0],
"log_dirs": ["any"]
}]
}
我们想把所有分区的副本都变成2,那我们只需修改"replicas": []里面的值了,这里面是Broker列表,排在第一个的是Leader; 所以我们根据自己想要的分配规则修改一下json文件就变成如下。
{
"version": 1,
"partitions": [{
"topic": "test_create_topic1",
"partition": 2,
"replicas": [2,0],
"log_dirs": ["any","any"]
}, {
"topic": "test_create_topic1",
"partition": 1,
"replicas": [1,2],
"log_dirs": ["any","any"]
}, {
"topic": "test_create_topic1",
"partition": 0,
"replicas": [0,1],
"log_dirs": ["any","any"]
}]
}
注意log_dirs
里面的数量要和replicas
数量匹配;或者直接把log_dirs
选项删除掉; 这个log_dirs
是副本跨路径迁移时候的绝对路径。
2.1.2、执行–execute
如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file
。
2.1.3、验证–verify
副本数量增加了;
2.2、副本缩容
副本缩容跟扩容是一个意思; 当副本分配少于之前的数量时候,多出来的副本会被删除;
比如刚刚新增了一个副本,想重新恢复到一个副本。
{
"version": 1,
"partitions": [{
"topic": "test_create_topic1",
"partition": 2,
"replicas": [2],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 1,
"replicas": [1],
"log_dirs": ["any"]
}, {
"topic": "test_create_topic1",
"partition": 0,
"replicas": [0],
"log_dirs": ["any"]
}]
}
执行之后可以看到其他的副本就被标记为删除了; 一会就会被清理掉。
用这样一种方式我们虽然是实现了副本的扩缩容, 但是副本的分配需要我们自己来把控好, 要做到负载均衡等等; 那肯定是没有kafka自动帮我们分配比较合理一点。
--generate
本质上也是调用了这个方法:
dminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
三、分区扩容
kafka的分区扩容是 kafka-topis.sh脚本实现的,不支持缩容。
四、分区迁移
分区迁移跟上面同理, 请看 1.1,1.2,1.3 部分。
五、副本跨路径迁移
为什么线上Kafka机器各个磁盘间的占用不均匀,经常出现“一边倒”的情形? 这是因为Kafka只保证分区数量在各个磁盘上均匀分布,但它无法知晓每个分区实际占用空间,故很有可能出现某些分区消息数量巨大导致占用大量磁盘空间的情况。在1.1版本之前,用户对此毫无办法,因为1.1之前Kafka只支持分区数据在不同broker间的重分配,而无法做到在同一个broker下的不同磁盘间做重分配。1.1版本正式支持副本在不同路径间的迁移。
怎么在一台Broker上用多个路径存放分区呢?
############################# Log Basics ##############################
A comma separated list of directories under which to store log fileslog.dirs=kafka-logs-5,kafka-logs-6,kafka-logs-7,kafka-logs-8
注意同一个Broker上不同路径只会存放不同的分区,而不会将副本存放在同一个Broker; 不然那副本就没有意义了(容灾)
怎么针对跨路径迁移呢?
迁移的json文件有一个参数是log_dirs; 默认请求不传的话 它是"log_dirs": ["any"] (这个数组的数量要跟副本保持一致)
但是你想实现跨路径迁移,只需要在这里填入绝对路径就行了,例如下面。
迁移的json文件示例:
{
"version": 1,
"partitions": [
{
"topic": "test_create_topic4",
"partition": 2,
"replicas": [
0
],
"log_dirs": [
"/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-5"
]
},
{
"topic": "test_create_topic4",
"partition": 1,
"replicas": [
0
],
"log_dirs": [
"/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-6"
]
}
]
}
然后执行脚本:
sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx --reassignment-json-file config/reassignment-json-file.json --execute --bootstrap-serverxxxxx:9092 --replica-alter-log-dirs-throttle 10000
注意 --bootstrap-server
在跨路径迁移的情况下,必须传入此参数。
如果需要限流的话 加上参数--replica-alter-log-dirs-throttle
; 跟--throttle
不一样的是 --replica-alter-log-dirs-throttle
限制的是Broker内不同路径的迁移流量。
评论