Hudi 集成 Spark

Hudi 集成 Spark

一、环境准备

1.1 安装Spark

1)Hudi支持的Spark版本

Hudi                Supported Spark 3 version
0.12.x              3.3.x,3.2.x,3.1.x
0.11.x              3.2.x(default build, Spark bundle only),3.1.x
0.10.x              3.1.x(default build), 3.0.x
0.7.0-0.9.0         3.0.x
0.6.0 and prior     Not supported

2)下载Spark安装包,解压

cd /opt/software/
wget https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
tar -zxvf spark-3.2.2-bin-hadoop3.2.tgz -C /opt/module/
mv /opt/module/spark-3.2.2-bin-hadoop3.2 /opt/module/spark-3.2.2

3)配置环境变量

sudo vim /etc/profile.d/my_env.sh

export SPARK_HOME=/opt/module/spark-3.2.2
export PATH=$PATH:$SPARK_HOME/bin

source /etc/profile.d/my_env.sh

4)拷贝编译好的包到 spark 的 jars 目录

cp /opt/software/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar /opt/module/spark-3.2.2/jars

1.2、启动 Hadoop

二、spark-shell 方式

2.1、启动 spark-shell

1)启动命令

#针对Spark 3.2
spark-shell \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

2)设置表名,基本路径和数据生成器

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode.
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

不需要单独的建表。如果表不存在,第一批写表将创建该表。

2.2、插入数据

新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。

cd /tmp/hudi_trips_cow/
ls

2.3、查询数据

1)转换成 DF

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

径需要按照分区目录拼接"*",如:load(basePath + "/*/*/*/*"),当前版本不需要。
2)查询

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

3)时间旅行查询
Hudi从0.9.0开始就支持时间旅行查询。目前支持三种查询时间格式,如下所示。

spark.read.
  format("hudi").
  option("as.of.instant", "20210728141108100").
  load(basePath)

spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28 14:11:08.200").
  load(basePath)

// 表示 "as.of.instant = 2021-07-28 00:00:00"
spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28").
  load(basePath)

2.4、更新数据

类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到 DataFrame 中并将 DataFrame 写入 Hudi 表中。

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

注意:保存模式现在是Append。通常,除非是第一次创建表,否则请始终使用追加模式。现在再次查询数据将显示更新的行程数据。每个写操作都会生成一个用时间戳表示的新提交。查找以前提交中相同的 _hoodie_record_keys 在该表的_hoodie_commit_time、rider、driver 字段中的变化。
查询更新后的数据,要重新加载该hudi表:

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

2.5、增量查询

Hudi 还提供了增量查询的方式,可以获取从给定提交时间戳以来更改的数据流。需要指定增量查询的 beginTime,选择性指定 endTime。如果我们希望在给定提交之后进行所有更改,则不需要指定 endTime(这是常见的情况)。
1)重新加载数据

spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")

2)获取指定 beginTime

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) 

3)创建增量查询的表

val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

4)查询增量表

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

这将过滤出 beginTime 之后提交且 fare>20 的数据。
利用增量查询,我们能在批处理数据上创建 streaming pipelines。

2.6、指定时间点查询

查询特定时间点的数据,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)
1)指定 beginTime 和 endTime

val beginTime = "000" 
val endTime = commits(commits.length - 2) 

2)根据指定时间创建表

val tripsPointInTimeDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  option(END_INSTANTTIME_OPT_KEY, endTime).
  load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")

3)查询

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

2.7、删除数据

根据传入的 HoodieKeys 来删除(uuid + partitionpath),只有 append 模式,才支持删除功能。
1)获取总行数

spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

2)取其中2条用来删除

val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

3)将待删除的2条数据构建 DF

val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

4)执行删除

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

5)统计删除数据后的行数,验证删除是否成功

val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath)

roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")

// 返回的总行数应该比原来少2行
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

2.8、覆盖数据

对于表或分区来说,如果大部分记录在每个周期都发生变化,那么做upsert或merge的效率就很低。我们希望类似hive的 "insert overwrite "操作,以忽略现有数据,只用提供的新数据创建一个提交。
也可以用于某些操作任务,如修复指定的问题分区。我们可以用源文件中的记录对该分区进行'插入覆盖'。对于某些数据源来说,这比还原和重放要快得多。
Insert overwrite操作可能比批量ETL作业的upsert更快,批量ETL作业是每一批次都要重新计算整个目标分区(包括索引、预组合和其他重分区步骤)。
1)查看当前表的key

spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)

2)生成一些新的行程数据

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
  read.json(spark.sparkContext.parallelize(inserts, 2)).
  filter("partitionpath = 'americas/united_states/san_francisco'")

3)覆盖指定分区

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION.key(),"insert_overwrite").
  option(PRECOMBINE_FIELD.key(), "ts").
  option(RECORDKEY_FIELD.key(), "uuid").
  option(PARTITIONPATH_FIELD.key(), "partitionpath").
  option(TBL_NAME.key(), tableName).
  mode(Append).
  save(basePath)

4)查询覆盖后的key,发生了变化

spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)

三、Spark SQL 方式

3.1、创建表

1)启动 Hive 的 Metastore

nohup hive --service metastore & 

启动 spark-sql

#针对Spark 3.2
spark-sql \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

如果没有配置 hive 环境变量,手动拷贝 hive-site.xml 到spark的 conf 下

2)建表参数

3)创建非分区表
(1)创建一个cow表,默认primaryKey 'uuid',不提供preCombineField

create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
) using hudi;

(2)创建一个mor非分区表

create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);

4)创建分区表
创建一个 cow 分区外部表,指定 primaryKey 和 preCombineField

create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

5)在已有的hudi表上创建新表
不需要指定模式和非分区列(如果存在)之外的任何属性,Hudi可以自动识别模式和配置。
(1)非分区表

create table hudi_existing_tbl0 using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';

(2)分区表

create table hudi_existing_tbl1 using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';

6)通过 CTAS (Create Table As Select) 建表
为了提高向 hudi 表加载数据的性能,CTAS 使用批量插入作为写操作。
(1)通过 CTAS 创建 cow 非分区表,不指定 preCombineField

create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;

(2)通过 CTAS 创建 cow 分区表,指定 preCombineField

create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;

(3)通过CTAS从其他表加载数据

# 创建内部表
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';

# 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (datestr) as select * from parquet_mngd;

3.2、插入数据

默认情况下,如果提供了 preCombineKey,则 insert into 的写操作类型为 upsert,否则使用 insert。
1)向非分区表插入数据

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

2)向分区表动态分区插入数据

insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;

3)向分区表静态分区插入数据

insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

4)使用bulk_insert插入数据
hudi 支持使用 bulk_insert 作为写操作的类型,只需要设置两个配置:
hoodie.sql.bulk.insert.enablehoodie.sql.insert.mode

-- 向指定preCombineKey的表插入数据,则写操作为upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001

-- 向指定preCombineKey的表插入数据,指定写操作为bulk_insert 
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001
1   a1_2    20.0    1002

3.3、查询数据

1)查询

select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0

2)时间旅行查询
Hudi 从 0.9.0 开始就支持时间旅行查询。Spark SQL 方式要求 Spark 版本 3.2及以上。

-- 关闭前面开启的bulk_insert
set hoodie.sql.bulk.insert.enable=false;

create table hudi_cow_pt_tbl1 (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';


-- 插入一条id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 修改id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1;

-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-07 09:16:28.100' where id = 1;

select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-08' where id = 1;

3.4、更新数据

1)update
更新操作需要指定 preCombineField。
(1)语法

UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

(2)执行更新

update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;

-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';

2)MergeInto
(1)语法

MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

<merge_condition> =A equal bool condition 
<matched_action>  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action>  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

(2)执行案例

-- 1、准备source表:非分区的hudi表,插入数据
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;


-- 2、准备source表:分区的parquet表,插入数据
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');

merge into hudi_cow_pt_tbl1 as target
using (
  select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
 update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
 insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;

3.5、删除数据

1)语法

DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]

2)案例

delete from hudi_cow_nonpcf_tbl where uuid = 1;

delete from hudi_mor_tbl where id % 2 = 0;

-- 使用非主键字段删除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';

3.6、覆盖数据

使用INSERT_OVERWRITE类型的写操作覆盖分区表
使用INSERT_OVERWRITE_TABLE类型的写操作插入覆盖非分区表或分区表(动态分区)
1)insert overwrite 非分区表

insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;

2)通过动态分区insert overwrite table到分区表

insert overwrite table hudi_cow_pt_tbl1 select 10, 'a10', 1100, '2021-12-09', '11';

3)通过静态分区insert overwrite 分区表

insert overwrite hudi_cow_pt_tbl1 partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

3.7、修改表结构(Alter Table)

1)语法

-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName

-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)

-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType

-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')

2)案例

--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid int;

--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

3.8、修改分区

1)语法

-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )

-- Show Partitions
SHOW PARTITIONS tableIdentifier

2)案例

--show partition:
show partitions hudi_cow_pt_tbl1;

--drop partition:
alter table hudi_cow_pt_tbl1 drop partition (dt='2021-12-09', hh='10');

注意:show partition 结果是基于文件系统表路径的。删除整个分区数据或直接删除某个分区目录并不精确。

3.9、存储过程(Procedures)

1)语法

--Call procedure by positional arguments
CALL system.procedure_name(arg_1, arg_2, ... arg_n)

--Call procedure by named arguments
CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1, ... arg_name_n => arg_n)

2)案例
可用的存储过程:https://hudi.apache.org/docs/procedures/

--show commit's info
call show_commits(table => 'hudi_cow_pt_tbl1', limit => 10);

四、IDEA编码方式

详见代码。

五、DeltaStreamer 导入工具

HoodieDeltaStreamer 工具 (hudi-utilities-bundle中的一部分) 提供了从 DFS 或 Kafka 等不同来源进行摄取的方式,并具有以下功能:

  • 精准一次从Kafka采集新数据,从 Sqoop、HiveIncrementalPuller 的输出或 DFS 文件夹下的文件增量导入。
  • 导入的数据支持 json、avro 或自定义数据类型。
  • 管理检查点,回滚和恢复。
  • 利用 DFS 或 Confluent schema registry 的 Avro Schema。
  • 支持自定义转换操作。

5.1、命令说明

执行如下命令,查看帮助文档:

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /opt/software/hudi-0.12.0/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.12.0.jar --help

Schema Provider和Source 配置项:https://hudi.apache.org/docs/hoodie_deltastreamer
下面以 File Based Schema Provider 和 JsonKafkaSource 为例:

5.2、准备Kafka数据

(1)启动 kafka 集群,创建测试用的 topic

bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --create --topic hudi_test

(2)准备 java 生产者代码往 topic 发送测试数据

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>

<!--fastjson <= 1.2.80 存在安全漏洞,-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>

package com.atguigu.util;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class TestKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("acks", "-1");
        props.put("batch.size", "1048576");
        props.put("linger.ms", "5");
        props.put("compression.type", "snappy");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            JSONObject model = new JSONObject();
            model.put("userid", i);
            model.put("username", "name" + i);
            model.put("age", 18);
            model.put("partition", random.nextInt(100));
            producer.send(new ProducerRecord<String, String>("hudi_test", model.toJSONString()));
        }
        producer.flush();
        producer.close();
    }
}

5.3、准备配置文件

(1)定义 arvo 所需 schema 文件(包括 source 和 target)

mkdir /opt/module/hudi-props/
vim /opt/module/hudi-props/source-schema-json.avsc
{        
  "type": "record",
  "name": "Profiles",   
  "fields": [
    {
      "name": "userid",
      "type": [ "null", "string" ],
      "default": null
    },
    {
      "name": "username",
      "type": [ "null", "string" ],
      "default": null
    },
    {
      "name": "age",
      "type": [ "null", "string" ],
      "default": null
    },
    {
      "name": "partition",
      "type": [ "null", "string" ],
      "default": null
    }
  ]
}

cp source-schema-json.avsc target-schema-json.avsc

(2)拷贝 hudi 配置 base.properties

cp /opt/software/hudi-0.12.0/hudi-utilities/src/test/resources/delta-streamer-config/base.properties /opt/module/hudi-props/ 

(3)根据源码里提供的模板,编写自己的 kafka source 的配置文件

cp /opt/software/hudi-0.12.0/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties /opt/module/hudi-props/

vim /opt/module/hudi-props/kafka-source.properties 

include=hdfs://hadoop1:8020/hudi-props/base.properties
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=userid
hoodie.datasource.write.partitionpath.field=partition
# schema provider configs
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://hadoop1:8020/hudi-props/source-schema-json.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://hadoop1:8020/hudi-props/target-schema-json.avsc
# Kafka Source
hoodie.deltastreamer.source.kafka.topic=hudi_test
#Kafka props
bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
auto.offset.reset=earliest
group.id=test-group

(4)将配置文件上传到hdfs

hadoop fs -put /opt/module/hudi-props/ /

5.4、拷贝所需 jar 包到 Spark

cp /opt/software/hudi-0.12.0/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.12.0.jar /opt/module/spark-3.2.2/jars/

需要把 hudi-utilities-bundle_2.12-0.12.0.ja r放入 spark 的 jars 路径下,否则报错找不到一些类和方法。

5.5、运行导入命令

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
/opt/module/spark-3.2.2/jars/hudi-utilities-bundle_2.12-0.12.0.jar \
--props hdfs://hadoop1:8020/hudi-props/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider  \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource  \
--source-ordering-field userid \
--target-base-path hdfs://hadoop1:8020/tmp/hudi/hudi_test  \
--target-table hudi_test \
--op BULK_INSERT \
--table-type MERGE_ON_READ

5.6、查看导入结果

(1)启动 spark-sql

spark-sql \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

(2)指定 location 创建 hudi 表

use spark_hudi;

create table hudi_test using hudi
location 'hdfs://hadoop1:8020/tmp/hudi/hudi_test'

(3)查询 hudi 表

select * from hudi_test;

六、并发控制

6.1、Hudi 支持的并发控制

1)MVCC
Hudi 的表操作,如压缩、清理、提交,hudi会利用多版本并发控制来提供多个表操作写入和查询之间的快照隔离。使用MVCC这种模型,Hudi 支持并发任意数量的操作作业,并保证不会发生任何冲突。Hudi 默认这种模型。MVCC 方式所有的 table service 都使用同一个 writer 来保证没有冲突,避免竟态条件。

2)OPTIMISTIC CONCURRENCY
针对写入操作(upsert、insert等)利用乐观并发控制来启用多个 writer 将数据写到同一个表中,Hudi 支持文件级的乐观一致性,即对于发生在同一个表中的任何2个提交(写入),如果它们没有写入正在更改的重叠文件,则允许两个写入都成功。此功能处于实验阶段,需要用到 Zookeeper 或 HiveMetastore 来获取锁。

6.2、使用并发写方式

1)参数
(1)如果需要开启乐观并发写入,需要设置以下属性

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<lock-provider-classname>

Hudi获取锁的服务提供两种模式使用zookeeper、HiveMetaStore或Amazon DynamoDB(选一种即可)
(2)相关zookeeper参数

hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url
hoodie.write.lock.zookeeper.port
hoodie.write.lock.zookeeper.lock_key
hoodie.write.lock.zookeeper.base_path

(3)相关 HiveMetastore 参数,HiveMetastore URI 是从运行时加载的 hadoop 配置文件中提取的

hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
hoodie.write.lock.hivemetastore.database
hoodie.write.lock.hivemetastore.table

6.3、使用 Spark DataFrame并发写入

(1)启动 spark-shell

spark-shell \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

(2)编写代码

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.write.concurrency.mode", "optimistic_concurrency_control").
  option("hoodie.cleaner.policy.failed.writes", "LAZY").
option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider").
  option("hoodie.write.lock.zookeeper.url", "hadoop1,hadoop2,hadoop3").
  option("hoodie.write.lock.zookeeper.port", "2181").
  option("hoodie.write.lock.zookeeper.lock_key", "test_table").
  option("hoodie.write.lock.zookeeper.base_path", "/multiwriter_test").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

(3)使用 zk 客户端,验证是否使用了 zk。

/opt/module/apache-zookeeper-3.5.7/bin/zkCli.sh 
[zk: localhost:2181(CONNECTED) 0] ls /

(4)zk 下产生了对应的目录,/multiwriter_test 下的目录,为代码里指定的 lock_key

[zk: localhost:2181(CONNECTED) 1] ls /multiwriter_test

6.4、使用 Delta Streamer 并发写入

基于前面 DeltaStreamer 的例子,使用 Delta Streamer 消费 kafka 的数据写入到 hudi 中,这次加上并发写的参数。
1)进入配置文件目录,修改配置文件添加对应参数,提交到 Hdfs 上

cd /opt/module/hudi-props/
cp kafka-source.properties kafka-multiwriter-source.propertis
vim kafka-multiwriter-source.propertis 

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url=hadoop1,hadoop2,hadoop3
hoodie.write.lock.zookeeper.port=2181
hoodie.write.lock.zookeeper.lock_key=test_table2
hoodie.write.lock.zookeeper.base_path=/multiwriter_test2

hadoop fs -put /opt/module/hudi-props/kafka-multiwriter-source.propertis /hudi-props

2)运行 Delta Streamer

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
/opt/module/spark-3.2.2/jars/hudi-utilities-bundle_2.12-0.12.0.jar \
--props hdfs://hadoop1:8020/hudi-props/kafka-multiwriter-source.propertis \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider  \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource  \
--source-ordering-field userid \
--target-base-path hdfs://hadoop1:8020/tmp/hudi/hudi_test_multi  \
--target-table hudi_test_multi \
--op INSERT \
--table-type MERGE_ON_READ

3)查看zk是否产生新的目录

/opt/module/apache-zookeeper-3.5.7-bin/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[zk: localhost:2181(CONNECTED) 1] ls /multiwriter_test2

七、常规调优

7.1、并行度

Hudi 对输入进行分区默认并发度为 1500,以确保每个 Spark 分区都在 2GB 的限制内(在 Spark2.4.0 版本之后去除了该限制),如果有更大的输入,则相应地进行调整。建议设置shuffle的并发度,配置项为 hoodie.[insert|upsert|bulkinsert].shuffle.parallelism,以使其至少达到 inputdatasize/500MB。

7.2、Off-heap(堆外)内存

Hudi写入parquet文件,需要使用一定的堆外内存,如果遇到此类故障,请考虑设置类似 spark.yarn.executor.memoryOverhead或 spark.yarn.driver.memoryOverhead的值。

7.3、Spark 内存

通常Hudi需要能够将单个文件读入内存以执行合并或压缩操作,因此执行程序的内存应足以容纳此文件。另外,Hudi会缓存输入数据以便能够智能地放置数据,因此预留一些 spark.memory.storageFraction通常有助于提高性能。

7.4、调整文件大小

设置 limitFileSize以平衡接收/写入延迟与文件数量,并平衡与文件数据相关的元数据开销。

7.5、时间序列/日志数据

对于单条记录较大的数据库/nosql 变更日志,可调整默认配置。另一类非常流行的数据是时间序列/事件/日志数据,它往往更加庞大,每个分区的记录更多。在这种情况下,请考虑通过 .bloomFilterFPP()/bloomFilterNumEntries()来调整 Bloom 过滤器的精度,以加速目标索引查找时间,另外可考虑一个以事件时间为前缀的键,这将使用范围修剪并显着加快索引查找的速度。

7.6、GC 调优

请确保遵循 Spark 调优指南中的垃圾收集调优技巧,以避免 OutOfMemory 错误。[必须]使用G1 / CMS收集器,其中添加到spark.executor.extraJavaOptions的示例如下:

-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof

7.7、OutOfMemory 错误

如果出现OOM错误,则可尝试通过如下配置处理:spark.memory.fraction=0.2spark.memory.storageFraction=0.2允许其溢出而不是 OOM(速度变慢与间歇性崩溃相比)。

7.8、完整的生产配置

spark.driver.extraClassPath /etc/hive/conf
spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.driver.maxResultSize 2g
spark.driver.memory 4g
spark.executor.cores 1
spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.executor.id driver
spark.executor.instances 300
spark.executor.memory 6g
spark.rdd.compress true

spark.kryoserializer.buffer.max 512m
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled true
spark.sql.hive.convertMetastoreParquet false
spark.submit.deployMode cluster
spark.task.cpus 1
spark.task.maxFailures 4

spark.yarn.driver.memoryOverhead 1024
spark.yarn.executor.memoryOverhead 3072
spark.yarn.max.executor.failures 100

评论

暂无

添加新评论