两种迁移场景,分别是:同集群数据迁移、跨集群数据迁移。

一、同集群迁移
同集群之间数据迁移,比如在已有的集群中新增了一个 Broker 节点,此时需要将原来集群中已有的 Topic 的数据迁移部分到新的集群中,缓解集群压力。
将新的节点添加到 Kafka 集群很简单,只需为它们分配一个唯一的 Broker ID,并在新服务器上启动 Kafka。但是,这些新服务器节点不会自动分配任何数据分区,因此除非将分区移动到新增的节点,否则在创建新 Topic 之前新节点不会执行任何操作。因此,通常在将新服务器节点添加到 Kafka 集群时,需要将一些现有数据迁移到这些新的节点。
迁移数据的过程是手动启动的,执行过程是完全自动化的。在 Kafka 后台服务中,Kafka 将添加新服务器作为其正在迁移的分区的 Follower,并允许新增节点完全复制该分区中的现有数据。当新服务器节点完全复制此分区的内容并加入同步副本(ISR)时,其中一个现有副本将删除其分区的数据。
Kafka 系统提供了一个分区重新分配工具 kafka-reassign-partitions.sh,该工具可用于在 Broker 之间迁移分区。理想情况下,将确保所有 Broker 的数据和分区均匀分配。分区重新分配工具无法自动分析 Kafka 群集中的数据分布并迁移分区以实现均匀的负载均衡。因此,管理员在操作的时候,必须弄清楚应该迁移哪些 Topic 或分区。
分区重新分配工具可以在 3 种互斥模式下运行:
--generate:在此模式下,给定 Topic 列表和 Broker 列表,该工具会生成候选重新分配,以将指定 Topic 的所有分区迁移到新 Broker 中。此选项仅提供了一种方便的方法,可在给定 Topic 和目标 Broker 列表的情况下生成分区重新分配计划。--execute:在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。 (使用--reassignment-json-file选项)。由管理员手动制定自定义重新分配计划,也可以使用--generate选项提供。--verify:在此模式下,该工具将验证最后一次--execute期间列出的所有分区的重新分配状态。状态可以有成功、失败或正在进行等状态。
1.1、迁移过程实现
分区重新分配工具可用于将一些 Topic 从当前的 Broker 节点中迁移到新添加的 Broker 中。这在扩展现有集群时通常很有用,因为将整个 Topic 移动到新的 Broker 变得更容易,而不是一次移动一个分区。当执行此操作时,用户需要提供已有的 Broker 节点的 Topic 列表,以及到新节点的 Broker 列表(源 Broker 到新 Broker 的映射关系)。然后,该工具在新的 Broker 中均匀分配给指定 Topic 列表的所有分区。在迁移过程中,Topic 的复制因子保持不变。
现有如下实例,将 Topic 为 ke01,ke02 的所有分区从 Broker1 中移动到新增的 Broker2 和 Broker3 中。由于该工具接受 Topic 的输入列表作为 JSON 文件,因此需要明确迁移的 Topic 并创建 json 文件,如下所示:
> cat topic-to-move.json
{"topics": [{"topic": "ke01"},
{"topic": "ke02"}],
"version":1
}
准备好 JSON 文件,然后使用分区重新分配工具生成候选分配,命令如下:
> bin/kafka-reassign-partitions.sh --zookeeper dn1:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
执行命名之前,Topic(ke01、ke02)的分区如下图所示:


执行完成命令之后,控制台出现如下信息:

该工具生成一个候选分配,将所有分区从 Topic ke01,ke02 移动到 Broker1 和 Broker2。需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它。新的赋值应保存在 JSON 文件(例如 expand-cluster-reassignment.json)中,以使用 --execute 选项执行。JSON 文件如下:
{"version":1,"partitions":[{"topic":"ke02","partition":0,"replicas":[2]},{"topic":"ke02","partition":1,"replicas":[1]},{"topic":"ke02","partition":2,"replicas":[2]},{"topic":"ke01","partition":0,"replicas":[2]},{"topic":"ke01","partition":1,"replicas":[1]},{"topic":"ke01","partition":2,"replicas":[2]}]}
执行命令如下所示:
> ./kafka-reassign-partitions.sh --zookeeper dn1:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
最后,--verify 选项可与该工具一起使用,以检查分区重新分配的状态。需要注意的是,相同的 expand-cluster-reassignment.json(与 --execute 选项一起使用)应与 --verify 选项一起使用,执行命令如下:
> ./kafka-reassign-partitions.sh --zookeeper dn1:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
执行结果如下图所示:

同时,可以通过 Kafka Eagle 工具来查看 Topic 的分区情况。


二、跨集群迁移
这里跨集群迁移,指的是在 Kafka 多个集群之间复制数据 “镜像” 的过程,以避免与单个集群中的节点之间发生的复制混淆。 Kafka 附带了一个用于在 Kafka 集群之间镜像数据的工具。该工具从源集群使用并生成到目标集群。这种镜像的一个常见用例是在另一个数据中心提供副本。
另外,你可以运行许多此类镜像进程以提高吞吐量和容错(如果一个进程终止,其他进程将占用额外负载)。将从源集群中的 Topic 读取数据,并将其写入目标集群中具有相同名称的主题。事实上,“镜像” 数据只不过是一个 Kafka 将消费者和生产者联系在了一起。
源集群和目标集群是完全独立的实体,它们可以具有不同数量的分区,并且偏移量将不相同。出于这个原因,镜像集群并不是真正意图作为容错机制(因为消费者的位置会有所不同); 为此,建议使用正常的集群内复制。但是,镜像进程将保留并使用消息 Key 进行分区,因此可以按 Key 保留顺序。
下面是一个跨集群的单 Topic 实例,命令如下:
./kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist ke03
需要注意的是,consumer.properties 文件配置源 Kafka 集群 Broker 地址,producer.properties 文件配置目标 Kafka 集群地址。如果需要迁移多个 Topic,可以使用 --whitelist 'A|B',如果需要迁移所有的 Topic,可以使用 --whitelist '*'。
三、结果预览
执行跨集群迁移命令后,目标集群中使用 Kafka Eagle 中查看 Topic Size 大小看是否与源集群的 Topic Size 大小相等,或者使用 SQL 语句,验证是否有数据迁移过来,结果如下图所示:
select * from "ke03" where "partition" in (0)
跨集群迁移数据的本质是,Kafka 启动了消费者读取源集群数据,并将消费后的数据写入到目标集群,在迁移的过程中,可以启动多个实例,提供迁出的吞吐量。
评论