Flink-CDC

Flink-CDC

一、CDC 简介

1.1、CDC

CDC 是 Change Data Capture(变更数据获取) 的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2、CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,主要了解一下这两种之间的区别:

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

二、FlinkCDC 示例

2.1、数据准备

2.1.1、在MySQL中创建数据库及表

2.1.2、插入数据

1)在 test 数据库中插入数据
2)在 test_route 数据库中插入数据

2.1.3、开启 MySQL Binlog 并重启 MySQL

[atguigu@hadoop103 ~]$ sudo vim /etc/my.cnf

#添加如下配置信息,开启 test 以及 test_route 数据库的 Binlog
#数据库id
server-id = 1
##启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
##binlog类型,maxwell要求为row类型
binlog_format=row
##启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=test
binlog-do-db=test_route

2.2、DataStream方式的应用

2.2.1、导入依赖

2.2.2、编写代码

详见代码。

2.2.3、案例测试

1)打包并上传至 Linux
2)启动 HDFS 集群

[atguigu@hadoop102 flink-1.18.0]$ start-dfs.sh

3)启动 Flink 集群

[atguigu@hadoop102 flink-1.18.0]$ bin/start-cluster.sh

4)启动程序

[atguigu@hadoop102 flink-1.18.0]$ bin/flink run -m hadoop102:8081 -c com.atguigu.cdc.FlinkCDCDataStreamTest ./flink-cdc-test.jar

5)观察 TaskManager 日志,会从头读取表数据
6)给当前的 Flink 程序创建 Savepoint

[atguigu@hadoop102 flink-local]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flinkCDC/save

在 WebUI 中 cancelJob 。
在 MySQL 的 test.t1 表中添加、修改或者删除数据。
从 Savepoint 重启程序。

[atguigu@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flinkCDC/save/savepoint-5dadae-02c69ee54885 -c com.atguigu.cdc. FlinkCDCDataStreamTest ./gmall-flink-cdc.jar

2.3、FlinkSQL 方式的应用

2.3.1、编写代码

详见代码

2.3.2、代码测试

直接运行即可,打包到集群测试与 DataStream 相同!

2.4、MySQL 到 Doris 的 StreamingETL 实现(3.0)

2.4.1、环境准备

1)安装 FlinkCDC

[atguigu@hadoop102 software]$ tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/

2)拖入 MySQL 以及 Doris 依赖包
flink-cdc-pipeline-connector-doris-3.0.0.jar以及flink-cdc-pipeline-connector-mysql-3.0.0.jar防止在FlinkCDC 的 lib 目录下。

2.4.2、同步变更

2)编写 MySQL 到 Doris 的同步变更配置文件
在 FlinkCDC 目录下创建 job 文件夹,并在该目录下创建同步变更配置文件。

[atguigu@hadoop102 flink-cdc-3.0.0]$ mkdir job/
[atguigu@hadoop102 flink-cdc-3.0.0]$ cd job
[atguigu@hadoop102 job]$ vim mysql-to-doris.yaml

source:
  type: mysql
  hostname: hadoop103
  port: 3306
  username: root
  password: "000000"
  tables: test.\.*
  server-id: 5400-5404
  server-time-zone: UTC+8

sink:
  type: doris
  fenodes: hadoop102:7030
  username: root
  password: "000000"
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 1

3)启动任务并测试
(1)开启 Flink 集群,注意:在 Flink 配置信息中打开 CheckPoint。

[atguigu@hadoop103 flink-1.18.0]$ vim conf/flink-conf.yaml

添加如下配置信息:

execution.checkpointing.interval: 5000

分发该文件至其他 Flink 机器并启动 Flink 集群:

[atguigu@hadoop103 flink-1.18.0]$ bin/start-cluster.sh

(2)开启 Doris FE

[atguigu@hadoop102 fe]$ bin/start_fe.sh

(3)开启 Doris BE

[atguigu@hadoop102 be]$ bin/start_be.sh

(4)在 Doris 中创建 test 数据库

[atguigu@hadoop103 doris]$ mysql -uroot -p000000 -P9030 -hhadoop102
mysql> create database test;
Query OK, 1 row affected (0.00 sec)

mysql> create database doris_test_route;
Query OK, 1 row affected (0.00 sec)

(5)启动 FlinkCDC 同步变更任务

[atguigu@hadoop103 flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris.yaml

(6)刷新 Doris 中 test 数据库观察结果。
(7)在 MySQL 的 test 数据中对应的几张表进行新增、修改数据以及新增列操作,并刷新 Doris 中 test 数据库观察结果。

2.4.3 路由变更

1)编写 MySQL 到 Doris 的路由变更配置文件

source:
  type: mysql
  hostname: hadoop103
  port: 3306
  username: root
  password: "000000"
  tables: test_route.\.*
  server-id: 5400-5404
  server-time-zone: UTC+8

sink:
  type: doris
  fenodes: hadoop102:7030
  benodes: hadoop102:7040
  username: root
  password: "000000"
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

route:
  - source-table: test_route.t1
    sink-table: doris_test_route.doris_t1
  - source-table: test_route.t2
    sink-table: doris_test_route.doris_t1
  - source-table: test_route.t3
    sink-table: doris_test_route.doris_t3

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 1

2)启动任务并测试

[atguigu@hadoop103 flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris-route.yaml

3)刷新 Doris 中 test_route 数据库观察结果。
4)在 MySQL 的 test_route 数据中对应的几张表进行新增、修改数据操作,并刷新 Doris 中 doris_test_route 数据库观察结果。

评论

暂无

添加新评论