Dinky

Dinky

一、概述

1.1、简介


Dinky 是一个开箱即用、易扩展,以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架的一站式实时计算平台,致力于流批一体和湖仓一体的探索与实践。
官网:http://www.dlink.top/

1.2、核心特性

1)沉浸式
提供专业的 DataStudio 功能,支持全屏开发、自动提示与补全、语法高亮、语句美化、语法校验、 调试预览结果、全局变量、MetaStore、字段级血缘分析、元数据查询、FlinkSQL 生成等功能。
2)易用性
Flink 多种执行模式无感知切换,支持 Flink 多版本切换,自动化托管实时任务、恢复点、报警等, 自定义各种配置,持久化管理的 Flink Catalog。
3)增强式
兼容且增强官方 FlinkSQL 语法,如 SQL 表值聚合函数、全局变量、CDC 整库同步、执行环境、 语句合并、共享会话等。
4)一站式
提供从 FlinkSQL 开发调试到上线下线的运维监控及 SQL 的查询执行能力,使数仓建设及数据治理一体化。
5)易扩展
源码采用 SPI 插件化及各种设计模式支持用户快速扩展新功能,如连接器、数据源、报警方式、 Flink Catalog、CDC 整库同步、自定义 FlinkSQL 语法等。
6)无侵入
Spring Boot 轻应用快速部署,不需要在任何 Flink 集群修改源码或添加额外插件,无感知连接和监控Flink 集群。

1.3、功能

  • 沉浸式 FlinkSQL 数据开发:自动提示补全、语法高亮、语句美化、在线调试、语法校验、执行计划、MetaStore、血缘分析、版本对比等。
  • 支持 FlinkSQL 多版本开发及多种执行模式:Local、Standalone、Yarn/Kubernetes Session、Yarn Per-Job、Yarn/Kubernetes Application。
  • 支持 Apache Flink 生态:Connector、FlinkCDC、Table Store 等。
  • 支持 FlinkSQL 语法增强:表值聚合函数、全局变量、执行环境、语句合并、整库同步等。
  • 支持 FlinkCDC 整库实时入仓入湖、多库输出、自动建表、模式演变。
  • 支持 Flink Java / Scala / Python UDF 开发与自动提交。
  • 支持 SQL 作业开发:ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、Presto、SqlServer、StarRocks 等。
  • 支持实时在线调试预览 Table、 ChangeLog、统计图和 UDF。
  • 支持 Flink Catalog、数据源元数据在线查询及管理。
  • 支持自动托管的 SavePoint/CheckPoint 恢复及触发机制:最近一次、最早一次、指定一次等。
  • 支持实时任务运维:上线下线、作业信息、集群信息、作业快照、异常信息、数据地图、数据探查、历史版本、报警记录等。
  • 支持作为多版本 FlinkSQL Server 以及 OpenApi 的能力。
  • 支持实时作业报警及报警组:钉钉、微信企业号、飞书、邮箱等。
  • 支持多种资源管理:集群实例、集群配置、Jar、数据源、报警组、报警实例、文档、系统配置等。
  • 支持企业级管理功能:多租户、用户、角色、命名空间等。

二、安装部署

Dinky 不依赖任何外部的 Hadoop 或者 Flink 环境,可以单独部署在 flink、 hadoop 和 K8S 集群之外,完全解耦,支持同时连接多个不同的集群实例进行运维。

2.1、解压到指定目录

上传安装包并解压:

tar -zxvf dlink-release-0.7.3.tar.gz -C /opt/module/
mv dlink-release-0.7.3 dinky
cd dinky

2.2、初始化MySQL数据库

Dinky 采用 mysql 作为后端的存储库,部署需要 MySQL5.7 以上版本,这里我们使用的 MySQL 是 8.0。
在 Dinky 根目录 sql 文件夹下分别放置了初始化的dinky.sql文件、升级使用的upgrade/${version}_schema/mysql/ddl 和 dml。如果第一次部署,可以直接将 sql/dinky.sql 文件在 dinky 数据库下执行。(如果之前已经部署,那 upgrade 目录下存放了各版本的升级 sql ,根据版本号按需执行即可)。
在MySQL中操作如下:

#创建数据库
mysql>
CREATE DATABASE dinky;

#创建用户并允许远程登录
mysql>
create user 'dinky'@'%' IDENTIFIED WITH mysql_native_password by 'dinky';

#授权
mysql>
grant ALL PRIVILEGES ON dinky.* to 'dinky'@'%';

mysql>
flush privileges;

登录创建好的dinky用户,执行初始化sql文件

mysql -udinky -pdinky
mysql>
use dinky;

mysql> source /opt/module/dinky/sql/dinky.sql

2.3、修改配置文件

修改 Dinky 连接 mysql 的配置文件。

cd /opt/module/dinky
vim ./config/application.yml

spring:
  datasource:
    url: jdbc:mysql://${MYSQL_ADDR:hadoop102:3306}/${MYSQL_DATABASE:dinky}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
    username: ${MYSQL_USERNAME:dinky}
    password: ${MYSQL_PASSWORD:dinky}
    driver-class-name: com.mysql.cj.jdbc.Driver
  application:
    name: dinky
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher
    format:
      date: yyyy-MM-dd HH:mm:ss
    #json格式化全局配置
  jackson:
    time-zone: GMT+8
    date-format: yyyy-MM-dd HH:mm:ss

  main:
    allow-circular-references: true

2.4、加载依赖

1)加载Flink依赖
Dinky 需要具备自身的 Flink 环境,该 Flink 环境的实现需要用户自己在 Dinky 根目录下 plugins/flink${FLINK_VERSION} 文件夹并上传相关的 Flink 依赖。当然也可在启动文件中指定 FLINK_HOME,但不建议这样做。

cp /opt/module/flink-1.17.0/lib/* /opt/module/dinky/plugins/flink1.17

2)加载Hadoop依赖
Dinky 当前版本的 yarn 的 perjob 与 application 执行模式依赖 flink-shade-hadoop ,需要额外添加 flink-shade-hadoop-uber-3 包。对于 dinky 来说,Hadoop 3 的 uber 依赖可以兼容 hadoop 2。
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar上传到/opt/module/dinky/plugins目录下。

2.5、上传jar包

使用 Application 模式时,需要将 flink 和 dinky 相关的包上传到 HDFS。
1)创建 HDFS 目录并上传 dinky 的 jar 包

hadoop fs -mkdir -p /dinky/jar/
hadoop fs -put /opt/module/dinky/jar/dlink-app-1.17-0.7.3-jar-with-dependencies.jar /dinky/jar

2)创建 HDFS 目录并上传 flink 的 jar 包

hadoop fs -mkdir /flink-dist
hadoop fs -put /opt/module/flink-1.17.0/lib /flink-dist
hadoop fs -put /opt/module/flink-1.17.0/plugins /flink-dist

2.6、启停命令

1)启动命令

cd /opt/module/dinky
sh auto.sh start 1.17

服务启动后,默认端口 8888,http://hadoop102:8888 , 默认用户名/密码: admin/admin
2)停止命令
停止命令不需要携带 Flink 版本号。

cd /opt/module/dinky
sh auto.sh stop

3)重启命令

cd /opt/module/dinky
sh auto.sh restart 1.17

2.7、Flink设置

使用 Application 模式以及 RestAPI 时,需要修改相关Flink配置:配置中心 >> Flink 配置。
将“提交 FlinkSQL 的 Jar 文件路径”修改为2.5中 dlink-app 包的路径,点击保存。

三、快速上手

3.1、创建集群

提交 FlinkSQL 作业时,首先要保证安装了 Flink 集群。Flink 当前支持的集群模式包括:

  • Standalone 集群
  • Yarn 集群
  • Kubernetes 集群

对于以上的三种集群而言,Dinky 为用户提供了两种集群管理方式,一种是集群实例管理,一种是集群配置管理。

3.1.1、Flink实例管理

集群实例管理适用于 Standalone,Yarn Session 和 Kubernetes Session 这三种集群实例的注册,其他类型的集群只能查看作业信息。对于已经注册的集群实例,可以对集群实例做编辑、删除、搜索、心跳检测和回收等。
注册中心 >> 集群管理 >> Flink实例管理,点击新建 Flink 集群。

1)注册Standalone集群
需要先手动启动Standalone集群:
/opt/module/flink-1.17.0/bin/start-cluster.sh
注册集群。

2)注册 Yarn Session 集群
需要先手动启动 Yarn Session 集群:
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
如果 flink 的配置文件没有指定 rest.port,那么记住启动后的主机名、端口,还有启动后的 application_id:
注册集群。

3.1.2、集群配置管理

集群配置管理适用于 Yarn Per-job、Yarn Application 和 Kubernetes Application 这三种类型配置。对于已经注册的集群配置,可以对集群配置做编辑、删除和搜索等。

四、特色功能

4.1、持久化的 Catalog

dinky 自己实现了 mysql-catalog,作用同 hive-catalog,可以持久化Flink元数据,在作业中无需再显式声明 DDL 语句,直接三段式引用元数据即可。

4.2、变量定义

4.2.1、定义变量

1)语法结构

key1 := value1;

2)示例

var1:=source;

--创建源表 source
CREATE TABLE ${var1}(
  id  BIGINT,
  name STRING
) WITH (
  'connector' = 'datagen'
);

select * from ${var1};

作业配置需要开启“全局变量”:

4.2.2、查看变量

1)注册全局变量
2)在FlinkSQLEnv中定义变量
dinky 支持将 FlinkSQL 封装为执行环境,供 FlinkSQL 任务使用。在执行 FlinkSQL 时,会先执行 FlinkSqlEnv 内的语句。前面作业配置中指定的FlinkSQL环境“DefaultCatalog”,就是 dinky 默认的 FlinkSQLEnv。
FlinkSQLEnv 场景适用于所有作业的 SET、DDL 语法统一管理的场景,当前FlinkSQLEnv 在 SQL 编辑器的语句限制在1000行以内。
新建作业,类型选择 FlinkSqlEnv:
3)查看变量

-- 查看所有变量
SHOW FRAGMENTS;
-- 查看单个变量
SHOW FRAGMENT var2;

执行按钮只执行到第一个 Select 或 Show 的语句,后续不执行。如果在 FlinkSQL 中定义时(比如上文定义的var1),要show fragments,需要执行完整的语句。

4.3、ADD JAR

ADD JAR 语句用于将用户 jar 添加到 classpath。可作用于standalone、session和 application 模式 。当连接器和第三方依赖过多时,经常容易导致 jar依赖冲突,add jar可以选择性的识别添加到服务器,做到环境隔离。

ADD JAR '<path_to_filename>.jar'

使用语法和 sql-client一致:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/jar/
比如,通过 add jar 的方式添加 mysql-cdc 的 jar 包。

add jar '/opt/software/flink-sql-connector-mysql-cdc-2.3.0.jar';

DROP TABLE IF EXISTS activity_info;
CREATE TABLE IF NOT EXISTS activity_info (
    `id` BIGINT NOT NULL COMMENT '活动id'
    ,`activity_name` STRING COMMENT '活动名称'
    ,`activity_type` STRING COMMENT '活动类型(1:满减,2:折扣)'
    ,`activity_desc` STRING COMMENT '活动描述'
    ,`start_time` TIMESTAMP COMMENT '开始时间'
    ,`end_time` TIMESTAMP COMMENT '结束时间'
    ,`create_time` TIMESTAMP COMMENT '创建时间'
    ,`operate_time` TIMESTAMP COMMENT '修改时间'
    ,PRIMARY KEY ( `id` ) NOT ENFORCED
) COMMENT '活动表'
 WITH (
     'connector' = 'mysql-cdc'
    ,${mysql102}
    ,'scan.incremental.snapshot.enabled' = 'true'
    ,'debezium.snapshot.mode'='latest-offset'
    ,'database-name' = 'gmall'
    ,'table-name' = 'activity_info'

);

CREATE  TABLE print
WITH (
    'connector' = 'print'
)
LIKE activity_info (EXCLUDING ALL);

insert into print select * from activity_info;

4.4、CDCSOURCE 整库同步

目前通过 FlinkCDC 进行会存在诸多问题,如需要定义大量的 DDL 和编写大量的 INSERT INTO,更为严重的是会占用大量的数据库连接,对 Mysql 和网络造成压力。
Dinky 定义了 CDCSOURCE 整库同步的语法,该语法和 CDAS 作用相似,可以直接自动构建一个整库入仓入湖的实时任务,并且对 source 进行了合并,不会产生额外的 Mysql 及网络压力,支持对任意 sink 的同步,如 kafka、doris、hudi、jdbc 等等
Dinky 采用的是只构建一个 source,然后根据 schema、database、table 进行分流处理,分别 sink 到对应的表。
CDCSOURCE 语句用于将上游指定数据库的所有表的数据采用一个任务同步到下游系统。整库同步默认支持 Standalone、Yarn Session、Yarn Per job、K8s Session。
Yarn Application也支持,参考:http://www.dlink.top/docs/next/data_integration_guide/cdcsource_statements
这里以 Yarn Session 模式为例,整库同步到 Kafka 中

4.4.1、使用语法

EXECUTE CDCSOURCE jobname 
  WITH ( key1=val1, key2=val2, ...)
WITH 参数通常用于指定 CDCSOURCE 所需参数,语法为'key1'='value1', 'key2' = 'value2'的键值对。

4.4.2、准备工作


http://www.dlink.top/docs/0.7/extend/practice_guide/cdc_kafka_multi_source_merge
1)启动Kafka
2)向Flink添加Dinky依赖

cp /opt/module/dinky/lib/dlink-common-0.7.3.jar /opt/module/flink-1.17.0/lib/
cp /opt/module/dinky/lib/dlink-client-base-0.7.3.jar /opt/module/flink-1.17.0/lib/
cp /opt/module/dinky/plugins/flink1.17/dinky/dlink-client-1.17-0.7.3.jar /opt/module/flink-1.17.0/lib/

3)添加连接器依赖
Dinky 和 Flink 都需要添加:

cp /opt/software/flink-sql-connector-mysql-cdc-2.3.0.jar /opt/module/flink-1.17.0/lib
cp /opt/software/flink-sql-connector-mysql-cdc-2.3.0.jar /opt/module/dinky/plugins

cp /opt/software/flink-sql-connector-kafka-1.17.0.jar /opt/module/flink-1.17.0/lib
cp /opt/software/flink-sql-connector-kafka-1.17.0.jar /opt/module/dinky/plugins/flink1.17

4)重启Yarn-Session集群
Session 和 Standalone 这种需要事先启动集群的模式,依赖发生改变,需要重启集群才能生效。重启后需要在 dinky 上修改Flink实例的配置。
5)重启 dinky

cd /opt/module/dinky
sh auto.sh restart 1.17

4.4.3、实时数据合并至一个Kafka Topic

EXECUTE CDCSOURCE cdc1 WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop102',
  'port' = '3306',
  'username' = 'root',
  'password' = '000000',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'gmall\.activity_info,gmall\.activity_rule',
  'sink.connector'='datastream-kafka',
  'sink.topic'='dlinkcdc',
  'sink.properties.transaction.timeout.ms'='60000',
  'sink.brokers'='hadoop102:9092'
);

注意:sinkProducer的超时时间默认为1个小时,但是kafka broker的超时时间默认是15分钟, kafka broker不允许producer的超时时间比他大,同时要比checkpoint间隔大,否则报错如下:

The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).

所以这里设置为60秒,小于 broker 最大超时15分钟,大于 checkpoint 间隔。
从webui可以看到,source是只有一个:

4.4.4、实时数据合并至对应Kafka Topic

不指定sink.topic,就是写入对应的 Topic:

EXECUTE CDCSOURCE cdc2 WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop102',
  'port' = '3306',
  'username' = 'root',
  'password' = '000000',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'gmall\.activity_info,gmall\.activity_rule',
  'sink.connector'='datastream-kafka',
  'sink.properties.transaction.timeout.ms'='60000',
  'sink.brokers'='hadoop102:9092'
);

评论

暂无

添加新评论