SeaTunnel

SeaTunnel

一、Seatunnel 概述

1.1、SeaTunnel

SeaTunnel 是一个简单易用的数据集成框架,在企业中,由于开发时间或开发部门不通用,往往有多个异构的、运行在不同的软硬件平台上的信息系统同时运行。数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,从而为企业提供全面的数据共享。SeaTunnel 支持海量数据的实时同步。它每天可以稳定高效地同步数百亿数据。并已用于近 100 家公司的生产。
SeaTunnel 的前身是 Waterdrop(中文名:水滴)自 2021 年 10 月 12日更名为 SeaTunnel。2021年 12月 9日,SeaTunnel 正式通过 Apache 软件基金会的投票决议,以全票通过的优秀表现正式成为 Apache 孵化器项目。2022 年 3 月 18 日社区正式发布了首个 Apache 版本 v2.1.0。

1.2、SeaTunnel 做什么

本质上,SeaTunnel 不是对 Saprk 和 Flink 的内部修改,而是在 Spark 和 Flink 的基础上做了一层包装。它主要运用了控制反转的设计模式,这也是 SeaTunnel 实现的基本思想。
SeaTunnel 的日常使用,就是编辑配置文件。编辑好的配置文件由 SeaTunnel 转换为具体的 Spark 或 Flink 任务。如图所示。

1.3、SeaTunnel 的应用场景

SeaTunnel 适用于以下场景:

  • 海量数据的同步
  • 海量数据的集成
  • 海量数据的 ETL
  • 海量数据聚合
  • 多源数据处理

SeaTunnel 的特点:

  • 基于配置的低代码开发,易用性高,方便维护。
  • 支持实时流式传输
  • 离线多源数据分析
  • 高性能、海量数据处理能力
  • 模块化的插件架构,易于扩展
  • 支持用 SQL 进行数据操作和数据聚合
  • 支持 Spark structured streaming
  • 支持 Spark 2.x

目前 SeaTunnel 的长板是他有丰富的连接器,又因为它以 Spark 和 Flink 为引擎。所以可以很好地进行分布式的海量数据同步。通常 SeaTunnel 会被用来做出仓入仓工具,或者被用来进行数据集成。比如,唯品会就选择用 SeaTunnel 来解决数据孤岛问题,让 ClcikHouse 集成到了企业中先前的数据系统之中。如图所示:

下图是 SeaTunnel 的工作流程:

1.5、SeaTunnel 目前的插件支持

1.5.1、Spark 连接器插件(Source)


这部分内容来官网,可以看出社区目前规划了大量的插件,但截至 V2.1.0 可用的 transform 插件的数量还是很少的。
官方网址:https://seatunnel.apache.org/zh-CN/

二、Seatunnel 安装和使用

注意 v2.1.0 中有少量 bug,要想一次性跑通所有示例程序,需使用自己编译的包。

2.1、SeaTunnel 的环境依赖

截至 SeaTunnel V2.1.0。
SeaTunnel 支持 Spark 2.x(尚不支持 Spark 3.x)。支持 Flink 1.9.0 及其以上的版本。
Java 版本需要 >=1.8
如下演示时使用的是 flink 版本是 1.13 。

2.2、SeaTunnel 的下载和安装

1)使用 wget 下载 SeaTunnel,使用-O参数将文件命名为 seatunnel-2.1.0.tar.gz

wget https://downloads.apache.org/incubator/seatunnel/2.1.0/apacheseatunnel-incubating-2.1.0-bin.tar.gz -O seatunnel-2.1.0.tar.gz2

2)解压下载好的 tar.gz 包

tar -zxvf seatunnel-2.1.0.tar.gz -C /opt/module/

3)查看解压的目标路径,apache-seatunnel-incubating-2.1.0 的目录就是我们已经安装好的 seatunnel。Incubating 的意思是孵化中。

2.3、SeaTunnel 的依赖环境配置

在 config/目录中有一个 seatunnel-env.sh 脚本。我们可以看一下里面的内容。

这个脚本中声明了 SPARK_HOME 和 FLINK_HOME 两个路径。默认情况下 seatunnelenv.sh 中的 SPARK_HOME 和 FLINK_HOME 就是系统环境变量中的 SPARK_HOME 和 FLINK_HOME。
在 shell 脚本中:-的意思是如果:-前的内容为空,则替换为后面的。
例如,环境变量中没有 FLINK_HOME。那么 SeaTunnel 运行时会将 FLINK_HOME 设 为/opt/flink。
如果你机器上的环境变量 SPARK_HOME 指向了 3.x 的一个版本。但是想用 2.x 的 Spark 来试一下 SeaTunnel。这种情况下,如果你不想改环境变量,那就直接在 seatunnel-env.sh 中将 2.x 的路径赋值给 SPARK_HOME 即可。

2.4、示例 1: SeaTunnel 快速开始

1)选择任意路径,创建一个文件。这里我们选择在 SeaTunnel 的 config 路径下创建一个
example01.conf

[atguigu@hadoop102 config]$ vim example01.conf

2)在文件中编辑如下内容
配置 Spark 或 Flink 的参数

env {
 # You can set flink configuration here
 execution.parallelism = 1
 #execution.checkpoint.interval = 10000
 #execution.checkpoint.data-uri = "hdfs://hadoop102:9092/checkpoint"
}
# 在 source 所属的块中配置数据源
source {
 SocketStream{
host = hadoop102
 result_table_name = "fake"
 field_name = "info"
 }
}
# 在 transform 的块中声明转换插件
transform {
 Split{
 separator = "#"
 fields = ["name","age"]
 }
 sql {
sql = "select info, split(info) as info_row from fake"
}
}
# 在 sink 块中声明要输出到哪
sink {
 ConsoleSink {}
}

3)开启 flink 集群

[atguigu@hadoop102 flink-1.11.6]$ bin/start-cluster.sh

4)开启一个 netcat 服务来发送数据

[atguigu@hadoop102 ~]$ nc -lk 9999

5)使用 SeaTunnel 来提交任务。
在 bin 目录下有以下内容
start-seatunnel-flink.sh 是用来提交 flink 任务的。start-seatunnel-spark.sh 是用来提交 Spark 任务的。这里我们用 flink 演示。所以使用 start-seatunnel-flink.sh
用--config 参数指定我们的应用配置文件。

bin/start-seatunnel-flink.sh --config config/example01.sh

等待弹出 Job 已经提交的提示。
6)在 netcat 上发送数据
7)在 Flink webUI 上查看输出结果。

8)小结
至此,已经跑完了一个官方案例。它以 Socket 为数据源。经过 SQL 的处理,最终输出到控制台。在这个过程中,我们并没有编写具体的 flink代码,也没有手动去打 jar包。我们只是将数据的处理流程声明在了一个配置文件中。
在背后,是 SeaTunnel 帮我们把配置文件翻译为具体的 flink 任务。配置化,低代码,易维护是 SeaTunnel 最显著的特点。

三、SeaTunnel 基本原理

3.1、SeaTunnel 的启动脚本

3.1.1、启动脚本的参数

截至目前,SeaTunnel 有两个启动脚本。
提交 spark 任务用 start-seatunnel-spark.sh
提交 flink 任务则用 start-seatunnel-flink.sh
本文主要是结合 flink 来使用 seatunnel 的,所以用 start-seatunnel-flink.sh
start-seatunnle-flink.sh 可以指定 3 个参数
分别是:

  • --config 应用配置的路径
  • --variable 应用配置里的变量赋值
  • --check 检查 config 语法是否合法

3.1.2、--check 参数

截至本文档撰写时的 SeaTunnel 版本 v2.1.0。check 功能还尚在开发中,因此--check 参数是一个虚设。目前 start-seatunnel-flink.sh并不能对应用配置文件的语法合法性进行检查。而且 start-seatunnel-flink.sh 中目前没有对--check 参数的处理逻辑。
需要注意!使用过程中,如果没有使用--check 参数,命令行一闪而过。那就是你的配置文件语法有问题。

3.1.3、--config 参数和--variable 参数

--config 参数用来指定应用配置文件的路径。
--variable 参数可以向配置文件传值。配置文件内是支持声明变量的。然后我们可以通过命令行给配置中的变量赋值。
变量声明语法如下。

sql {
 sql = "select * from (select info,split(info) from fake) where age > '"${age}"'"
 }

在配置文件的任何位置都可以声明变量。并用命令行参数--variable key=value 的方式将变量值传进去,你也可以用它的短命令形式 -i key=value。传递参数时,key 需要和配置文件中声明的变量名保持一致。
如果需要传递多个参数,那就在命令行里面传递多个-i 或--variable key=value。
比如:

bin/start-seatunnel-flink.sh --config/xxx.sh -i age=18 -i sex=man

3.1.4、示例 2:配置中使用变量

1)我们在 example01.conf 的基础上创建 example02.conf

[atguigu@hadoop102 config]$ cp example01.sh example02.sh

2)修改文件

[atguigu@hadoop102 config]$ vim example02.sh

3)给 sql 插件声明一个变量,红色的是我们修改的地方。最终的配置文件如下。

env {
 execution.parallelism = 1
}
source {
 SocketStream{
 result_table_name = "fake"
 field_name = "info"
 }
}
transform {
 Split{
 separator = "#"
 fields = ["name","age"]
 }
 sql {
    sql = "select * from (select info, split(info) from fake) 
    where age > '"${age}"'"
    # 需要套一层子查询,因为 where 先于 select,split 出的字段无法用 where 过滤
 }
}
sink {
 ConsoleSink {}
}

4)开启 netcat 服务

[atguigu@hadoop102 ~]nc -l 9999

5)使用 SeaTunnel 来提交任务。-i age=18 往命令行中

bin/start-seatunnel-flink.sh --config config/example01.sh -i age=18

6)接着,用 nc 发送几条数据看看效果。
7)在 flink 的 webUI 上看一下控制台的输出。最终发现未满 18 岁的李四被过滤掉了。

8)小结
通过传递变量,我们可以实现配置文件的复用。让同一份配置文件就能满足不同的业务需求。

在启动脚本的尾部,我们可以看到,start-seatunnel-flink.sh 会执行(exec)一条命令,这个命令会使用 flink 的提交脚本去向集群提交一个任务。而且在调用 bin/flink run 的时候,还传递了 PARAMS 作为 flink run 的参数。
如下图所示,我们可知,凡是--config 和 --variable 之外的命令行参数都被放到 PARAMS 变量中,最后相当于给 flink run 传递了参数。注意!命令行参数解析过程中没有涉及--check 参数处理。这也是为什么说它目前不支持--check 操作。
比如,我们可以在 seatunnel 启动脚本中,指定 flink job 并行度。

bin/start-seatunnel-flink.sh --config config/ -p 2\

3.2、SeaTunnel 的配置文件

3.2.1、应用配置的 4 个基本组件

从 SeaTunnel 的 app 配置文件开始讲起。
一个完整的 SeaTunnel 配置文件应包含四个配置组件。分别是:

env{} source{} --> transform{} --> sink{}

在 Source和 Sink数据同构时,如果业务上也不需要对数据进行转换,那么 transform中 的内容可以为空。具体需根据业务情况来定。

3.2.2、env 块

env 块中可以直接写 spark 或 flink 支持的配置项。比如并行度,检查点间隔时间。检查点 hdfs 路径等。在 SeaTunnel 源码的 ConfigKeyName 类中,声明了 env 块中所有可用的 key。
如图所示:

3.2.3、SeaTunnel 中的核心数据结构 Row

Row 是 SeaTunnel 中数据传递的核心数据结构。对 flink 来说,source 插件需要给下游的转换插件返回一个 DataStream,转换插件接到上游的 DataStream进行处理后需要再给下游返回一个 DataStream。最后 Sink 插件将转换插件处理好的 DataStream输出到外部的数据系统。

因为 DataStream可以很方便地和 Table 进行互转,所以将 Row 当作核心数据结构可以让转换插件同时具有使用代码(命令式)和 sql(声明式)处理数据的能力。

3.2.4、source 块

source 块是用来声明数据源的。source 块中可以声明多个连接器。比如:

# 伪代码
env {
 ...
}
source {
 hdfs { ... } 
 elasticsearch { ... }
 jdbc {...}
}
transform {
 sql {
 sql = """
 select .... from hdfs_table 
 join es_table 
 on hdfs_table.uid = es_table.uid where ..."""
 }
}
sink {
 elasticsearch { ... }
}

需要注意的是,所有的 source 插件中都可以声明 result_table_name。如果你声明了 result_table_name。SeaTunnel 会将 source 插件输出的 DataStream转换为 Table 并注册在Table环境中。当你指定了result_table_name,那么你还可以指定field_name,在注册时,给 Table 重设字段名(后面的案例中会讲解)。
因为不同 source 所需的配置并不一样,所以对 source 连接器的配置最好参考官方的文档。

3.2.5、transform 块

目前社区对插件做了很多规划,但是截至 v2.1.0 版本,可用的插件总共有两个,一个
是 Split,另一个是 sql。
transform{}块中可以声明多个转换插件。所有的转换插件都可以使用source_table_name,和 result_table_name。同样,如果我们声明了 result_table_name,那么我们就能声明 field_name。
需要着重了解一下 Split 插件和 sql 插件的实现。但在此在 SeaTunnel 中,一个转换插件的实现类最重要的逻辑在下述四个方法中。
1)处理批数据,DataSet进,DataSet

DataSet<Row> processBatch(FlinkEnvironment env, DataSet<Row> data)

2)处理流数据,DataStram进,DataStream

DataStream<Row> processStream(FlinkEnvironment env, DataStream<Row> dataStream)

3)函数名叫注册函数。实际上,这是一个约定,它只不过是每个 transform 插件作用于流之后调用的一个函数。

void registerFunction(FlinkEnvironment env, DataStream<Row> datastream)

4)处理一些预备工作,通常是用来解析配置。

void prepare(FlinkEnvironment prepareEnv)

Split 插件的实现
现在我们需要着重看一下 Split 插件的实现。
先回顾一下之前 example01.conf 中关于 transform 的配置。
发现 Split 插件并没有对数据流进行任何的处理,而是将它直接 return 了。反之,它向表环境中注册了一个名为 split 的 UDF(用户自定义函数)。而且,函数名是写死的。这意味着,如果你声明了多个 Split,后面的 UDF 还会把前面的覆盖。
这是开发时需要注意的一个点.

但是,需要注意,tranform 接口其实是留给了我们直接操作数据的能力的。也就是 processStream 方法。那么,一个 transform 插件其实同时履行了 process 和 udf 的职责,这是违反单一职责原则的。那要判断一个转换插件在做什么就只能从源码和文档的方面来加以区分了。

最后需要叮嘱的是,指定 soure_table_name 对于 sql 插件的意义不大。因为 sql 插件可以通过 from 子句来决定从哪个表里抽取数据。

3.2.6、sink 块

Sink 块里可以声明多个 sink 插件,每个 sink 插件都可以指定 source_table_name。不过因为不同 Sink 插件的配置差异较大,所以在实现时建议参考官方文档。

3.3、SeaTunnel 的基本原理

SeaTunnel 的工作原理简单明了。
1)程序会解析你的应用配置,并创建环境
2)配置里source{},transform{},sink{}三个块中的插件最终在程序中以List集合的方式存在。
3)由Excution对象来拼接各个插件,这涉及到选择source_table,注册result_table等流程,注册 udf 等流程。并最终触发执行。
可以参考下图:

3.4、小结

最后我们用一张图将 SeaTunnel 中的重要概念串起来。

如果你愿意,依托 sql 插件和 udf。单个配置文件也可以定义出比较复杂的工作流。但 SeaTunnel 的定位是一个数据集成平台。核心的功能是依托丰富的连接器进行数据同步,数据处理并不是 SeaTunnel 的长处。所以在 SeaTunnel 中定义复杂的工作流或许是一种不值得提倡的做法。
需要提醒的是,如果你不指定 source_table_name,插件会使用它在配置文件上最近的上一个插件的输出作为输入。
所以,我们可以通过使用依托表名表环境来实现复杂的工作流。

也可以减少表名的使用实现简单的数据同步通道。

评论

暂无

添加新评论