一、CDC 简介
1.1、CDC
CDC 是 Change Data Capture(变更数据获取) 的简称
。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
1.2、CDC 的种类
CDC 主要分为基于查询和基于 Binlog 两种方式,主要了解一下这两种之间的区别:
1.3、Flink-CDC
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
二、FlinkCDC 示例
2.1、数据准备
2.1.1、在MySQL中创建数据库及表
2.1.2、插入数据
1)在 test 数据库中插入数据
2)在 test_route 数据库中插入数据
2.1.3、开启 MySQL Binlog 并重启 MySQL
[atguigu@hadoop103 ~]$ sudo vim /etc/my.cnf
#添加如下配置信息,开启 test 以及 test_route 数据库的 Binlog
#数据库id
server-id = 1
##启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
##binlog类型,maxwell要求为row类型
binlog_format=row
##启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=test
binlog-do-db=test_route
2.2、DataStream方式的应用
2.2.1、导入依赖
2.2.2、编写代码
详见代码。
2.2.3、案例测试
1)打包并上传至 Linux
2)启动 HDFS 集群
[atguigu@hadoop102 flink-1.18.0]$ start-dfs.sh
3)启动 Flink 集群
[atguigu@hadoop102 flink-1.18.0]$ bin/start-cluster.sh
4)启动程序
[atguigu@hadoop102 flink-1.18.0]$ bin/flink run -m hadoop102:8081 -c com.atguigu.cdc.FlinkCDCDataStreamTest ./flink-cdc-test.jar
5)观察 TaskManager 日志,会从头读取表数据
6)给当前的 Flink 程序创建 Savepoint
[atguigu@hadoop102 flink-local]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flinkCDC/save
在 WebUI 中 cancelJob 。
在 MySQL 的 test.t1 表中添加、修改或者删除数据。
从 Savepoint 重启程序。
[atguigu@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flinkCDC/save/savepoint-5dadae-02c69ee54885 -c com.atguigu.cdc. FlinkCDCDataStreamTest ./gmall-flink-cdc.jar
2.3、FlinkSQL 方式的应用
2.3.1、编写代码
详见代码
2.3.2、代码测试
直接运行即可,打包到集群测试与 DataStream 相同!
2.4、MySQL 到 Doris 的 StreamingETL 实现(3.0)
2.4.1、环境准备
1)安装 FlinkCDC
[atguigu@hadoop102 software]$ tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/
2)拖入 MySQL 以及 Doris 依赖包
将flink-cdc-pipeline-connector-doris-3.0.0.jar
以及flink-cdc-pipeline-connector-mysql-3.0.0.jar
防止在FlinkCDC 的 lib 目录下。
2.4.2、同步变更
2)编写 MySQL 到 Doris 的同步变更配置文件
在 FlinkCDC 目录下创建 job 文件夹,并在该目录下创建同步变更配置文件。
[atguigu@hadoop102 flink-cdc-3.0.0]$ mkdir job/
[atguigu@hadoop102 flink-cdc-3.0.0]$ cd job
[atguigu@hadoop102 job]$ vim mysql-to-doris.yaml
source:
type: mysql
hostname: hadoop103
port: 3306
username: root
password: "000000"
tables: test.\.*
server-id: 5400-5404
server-time-zone: UTC+8
sink:
type: doris
fenodes: hadoop102:7030
username: root
password: "000000"
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 1
3)启动任务并测试
(1)开启 Flink 集群,注意:在 Flink 配置信息中打开 CheckPoint。
[atguigu@hadoop103 flink-1.18.0]$ vim conf/flink-conf.yaml
添加如下配置信息:
execution.checkpointing.interval: 5000
分发该文件至其他 Flink 机器并启动 Flink 集群:
[atguigu@hadoop103 flink-1.18.0]$ bin/start-cluster.sh
(2)开启 Doris FE
[atguigu@hadoop102 fe]$ bin/start_fe.sh
(3)开启 Doris BE
[atguigu@hadoop102 be]$ bin/start_be.sh
(4)在 Doris 中创建 test 数据库
[atguigu@hadoop103 doris]$ mysql -uroot -p000000 -P9030 -hhadoop102
mysql> create database test;
Query OK, 1 row affected (0.00 sec)
mysql> create database doris_test_route;
Query OK, 1 row affected (0.00 sec)
(5)启动 FlinkCDC 同步变更任务
[atguigu@hadoop103 flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris.yaml
(6)刷新 Doris 中 test 数据库观察结果。
(7)在 MySQL 的 test 数据中对应的几张表进行新增、修改数据以及新增列操作,并刷新 Doris 中 test 数据库观察结果。
2.4.3 路由变更
1)编写 MySQL 到 Doris 的路由变更配置文件
source:
type: mysql
hostname: hadoop103
port: 3306
username: root
password: "000000"
tables: test_route.\.*
server-id: 5400-5404
server-time-zone: UTC+8
sink:
type: doris
fenodes: hadoop102:7030
benodes: hadoop102:7040
username: root
password: "000000"
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
route:
- source-table: test_route.t1
sink-table: doris_test_route.doris_t1
- source-table: test_route.t2
sink-table: doris_test_route.doris_t1
- source-table: test_route.t3
sink-table: doris_test_route.doris_t3
pipeline:
name: Sync MySQL Database to Doris
parallelism: 1
2)启动任务并测试
[atguigu@hadoop103 flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris-route.yaml
3)刷新 Doris 中 test_route 数据库观察结果。
4)在 MySQL 的 test_route 数据中对应的几张表进行新增、修改数据操作,并刷新 Doris 中 doris_test_route 数据库观察结果。
评论