五、Hudi集成Flink案例详解

5.1 hudi集成flink

flink的下载地址:

https://archive.apache.org/dist/flink/

HudiSupported Flink version
0.12.x1.15.x1.14.x1.13.x
0.11.x1.14.x1.13.x
0.10.x1.13.x
0.9.01.12.2
  • 将上述编译好的安装包拷贝到flink下的jars目录中:
cp /opt/apps/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar /opt/apps/flink-1.13.6/lib/
  • 拷贝guava包,解决依赖冲突
cp /opt/apps/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apps/flink-1.13.6/lib/
  • 配置Hadoop环境变量
vim /etc/profile.d/my_env.shexport HADOOP_CLASSPATH=`hadoop classpath`export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopsource /etc/profile.d/my_env.sh

5.2 sql-client之yarn-session模式

配置hadoop调度器yarn

mapred-site.xml<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property><property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property><property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property></configuration>yarn-site.xml<configuration><property><name>yarn.resourcemanager.hostname</name><value>centos04</value></property><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property></configuration>hadoop-env.sh# 在最后面添加如下:export YARN_RESOURCEMANAGER_USER=rootexport YARN_NODEMANAGER_USER=root# 记得配置sql-client-defaults.yaml

5.2.1 启动

# 1、修改配置文件vim /opt/apps/flink-1.13.6/conf/flink-conf.yamlclassloader.check-leaked-classloader: falsetaskmanager.numberOfTaskSlots: 4 state.backend: rocksdbexecution.checkpointing.interval: 30000 # 开启ck,才能快速从内存中flush出去state.checkpoints.dir: hdfs://centos04:9000/ckpsstate.backend.incremental: true# 2、yarn-session模式启动# 解决依赖问题cp /opt/apps/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/apps/flink-1.13.6/lib/# 启动yarn-session/opt/apps/flink-1.13.6/bin/yarn-session.sh -d# 启动sql-client/opt/apps/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session

5.2.2 插入数据

set sql-client.execution.result-mode=tableau;-- 创建hudi表CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' -- 默认是COW);或如下写法CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20),PRIMARY KEY(uuid) NOT ENFORCED)PARTITIONED BY (`partition`)WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ');-- 插入数据INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');-- 查询数据select * from t1;

5.2.3 流式插入

-- 1、创建测试表CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20)) WITH ('connector' = 'datagen','rows-per-second' = '1');create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20))with ('connector' = 'hudi','path' = '/tmp/hudi_flink/t2','table.type' = 'MERGE_ON_READ');-- 2、执行插入insert into t2 select * from sourceT;查询结果set sql-client.execution.result-mode=tableau;Flink SQL> select * from t2 limit 10;-- 会产生一个collect的flink任务,拉取10条数据,注意:不是流读取2023-03-06 22:45:10,403 INFOorg.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false2023-03-06 22:45:12,897 INFOorg.apache.hadoop.yarn.client.RMProxy[] - Connecting to ResourceManager at centos04/192.168.42.104:80322023-03-06 22:45:12,899 INFOorg.apache.flink.yarn.YarnClusterDescriptor[] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar2023-03-06 22:45:12,918 INFOorg.apache.flink.yarn.YarnClusterDescriptor[] - Found Web Interface centos04:45452 of application 'application_1678113536312_0001'.+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+| op | uuid | name | age |ts |partition |+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+| +I | d0523c31d3da5b8e2a8ff676dcf... | 327db70824413c5dcde0a7ac10c... |1971040768 | 2023-03-06 14:40:58.780 | 42b45346672bf719b5393232763... || +I | cfc07cbebf6890a04942ec88947... | 36fc7a58aab88835f11b3b51a40... | -12199364 | 2023-03-06 14:41:05.781 | e33c02173f4c744fb9c1c68e774... || +I | 668b204a933494a89b829c76bc6... | aa9ff2109457fdcd5f099b8ce98... |2061449955 | 2023-03-06 14:41:14.780 | 680514e53b196324423cd12cda5... || +I | 95fe7878909a801c2726f1d05f5... | 1c86b29fe313e557688df0ba950... | 519997290 | 2023-03-06 14:41:11.781 | b9817c52301ab4614c3053c9ccc... || +I | 8661c25c8c930f4660fbefa867e... | 01a2bee6b99064c7bca9513ca37... |-682830738 | 2023-03-06 14:41:32.781 | 16ab837502a31e208b06bb74efd... || +I | 55ce03895e229b29546dbdd2ff3... | 77f2552de13337e8092c1445654... |2011273584 | 2023-03-06 14:41:09.780 | 3fd688cfa17b2a3a6fd3ffac6bd... || +I | 50c23f315d736c313b652b34fc5... | 4f9c84ff75466fba8e800daabd0... |-190184764 | 2023-03-06 14:42:26.780 | 7f2a07a1007b2fbfea8cbb2062e... || +I | 8073e8c70a9bc0e79c2e69aa885... | 30bf89c80d9ab0f0a8f5f883ee6... | -1639873427 | 2023-03-06 14:41:24.781 | 15df7d527d6d7edae496e76d02f... || +I | 29a61b7cd348d08498d2b089a5d... | 77a63ca7a2e77e6d167de20c673... |71527378 | 2023-03-06 14:42:14.781 | 2842db44a691f4f1d597ac79086... || +I | e5defc24191f60557644b7d14e2... | 56bdd04424b8f422d4075ade510... |1054223989 | 2023-03-06 14:40:42.781 | e8d2d3c6fed90d37b15647d1ecd... |+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+

5.3 使用IDEA开发

除了用sql-client,还可以自己编写FlinkSQL程序,打包提交Flink作业。

1、首先,需要将hudi集成flink的jar包,装载到本地的仓库,命令如下:

D:\bigdata\hudi从入门到精通\apps>mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar[INFO] Scanning for projects...[INFO][INFO] ------------------< org.apache.maven:standalone-pom >-------------------[INFO] Building Maven Stub Project (No POM) 1[INFO] --------------------------------[ pom ]---------------------------------[INFO][INFO] --- maven-install-plugin:2.4:install-file (default-cli) @ standalone-pom ---[INFO] Installing D:\bigdata\hudi从入门到精通\apps\hudi-flink1.13-bundle-0.12.0.jar to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.jar[INFO] Installing C:\Users\Undo\AppData\Local\Temp\mvninstall50353756903805721.pom to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.pom[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time:1.111 s[INFO] Finished at: 2023-03-02T10:08:15+08:00[INFO] ------------------------------------------------------------------------

2、导入pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>hudi-start</artifactId><groupId>com.yyds</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>hudi-flink</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><hudi.version>0.12.0</hudi.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope> </dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink_2.12</artifactId><version>${hudi.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>
package com.yyds.hudi.flink;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.RestOptions;import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;import org.apache.flink.contrib.streaming.state.PredefinedOptions;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiTest {public static void main(String[] args) {System.setProperty("HADOOP_USER_NAME","root");// 1、创建flinksql的执行环境Configuration conf = new Configuration();conf.setString(RestOptions.BIND_PORT, "8081-8089");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);// 注意:需要设置check-point// 设置状态后端RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);// checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://centos04:9000/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2、使用flink自带connector模拟数据tabEnv.executeSql("CREATE TABLE sourceT (\n" +"uuid varchar(20),\n" +"name varchar(10),\n" +"age int,\n" +"ts timestamp(3),\n" +"`partition` varchar(20)\n" +") WITH (\n" +"'connector' = 'datagen',\n" +"'rows-per-second' = '1'\n" +")");// 3、创建hudi表tabEnv.executeSql("create table t2(\n" +"uuid varchar(20),\n" +"name varchar(10),\n" +"age int,\n" +"ts timestamp(3),\n" +"`partition` varchar(20)\n" +")\n" +"with (\n" +"'connector' = 'hudi',\n" + // 指定connector为hudi"'path' = 'hdfs://192.168.42.104:9000/datas/hudi_warehouse/hudi_flink/t2',\n" +"'table.type' = 'MERGE_ON_READ'\n" +// MOR类型的表")");// 4、将模拟产生的数据,写入到Hudi表中tabEnv.executeSql("insert into t2 select * from sourceT");}}

jar包运行

bin/flink run -t yarn-per-job \-c com.yyds.hudi.flink.HudiTest \./myjars/hudi-flink-1.0-SNAPSHOT.jar

类型映射

Flink SQL TypeHudi TypeAvro logical type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYbytes
DECIMALfixeddecimal
TINYINTint
SMALLINTint
INTint
BIGINTlong
FLOATfloat
DOUBLEdouble
DATEintdate
TIMEinttime-millis
TIMESTAMPlongtimestamp-millis
ARRAYarray
MAP(key must be string/char/varchar type)map
MULTISET(element must be string/char/varchar type)map
ROWrecord

5.4 hudi核心参数

5.4.1 去重参数

-- 通过如下语法设置主键:-- 设置单个主键create table hoodie_table (f0 int primary key not enforced,f1 varchar(20),...) with ('connector' = 'hudi',...)-- 设置联合主键create table hoodie_table (f0 int,f1 varchar(20),...primary key(f0, f1) not enforced) with ('connector' = 'hudi',...)
名称说明默认值备注
hoodie.datasource.write.recordkey.field主键字段支持主键语法 PRIMARY KEY 设置,支持逗号分隔的多个字段
precombine.field(0.13.0 之前版本为 write.precombine.field)去重时间字段record 合并的时候会按照该字段排序,选值较大的 record 为合并结果;不指定则为处理序:选择后到的 record

5.4.2 并发参数

名称说明默认值备注
write.taskswriter 的并发,每个 writer 顺序写 1~N 个 buckets4增加并发对小文件个数没影响
write.bucket_assign.tasksbucket assigner 的并发Flink的并行度增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket) 数
write.index_bootstrap.tasksIndex bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 checkpoint,因此需要设置多一些的 checkpoint 失败容忍次数Flink的并行度只在 index.bootstrap.enabled 为 true 时生效
read.tasks读算子的并发(batch 和 stream)4
compaction.tasksonline compaction 算子的并发writer 的并发online compaction 比较耗费资源,建议走 offline compaction

可以flink建表时在with中指定,或Hints临时指定参数的方式:在需要调整的表名后面加上 /*+ OPTIONS() */

案例如下:

insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */select * from sourceT;# 从下图可以看出,writer 的并发变成了2,bucket assigner 的并发变成了3,compaction_task 变成了4


可以参考下面Hudi表读取原理,看上图。

5.4.3 压缩参数

​ 在线压缩的参数,通过设置 compaction.async.enabled =false关闭在线压缩执行,但是调度compaction.schedule.enabled 仍然建议开启(即上图的compact_plan_generate步骤),之后通过离线压缩直接执行 在线压缩任务 阶段性调度的压缩 plan

名称说明默认值备注
compaction.schedule.enabled是否阶段性生成压缩 plantrue建议开启,即使compaction.async.enabled 关闭的情况下
compaction.async.enabled是否开启异步压缩true通过关闭此参数关闭在线压缩
compaction.tasks压缩 task 并发4
compaction.trigger.strategy压缩策略num_commits支持四种策略:num_commits、time_elapsed、num_and_time、num_or_time
compaction.delta_commits默认策略,5 个 commits 压缩一次5
compaction.delta_seconds3600
compaction.max_memory压缩去重的 hash map 可用内存100(MB)资源够用的话建议调整到 1GB
compaction.target_io每个压缩 plan 的 IO 上限,默认 5GB500(GB)

案例如下:

CREATE TABLE t3(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20))WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t3','compaction.async.enabled' = 'true', -- 异步在线压缩'compaction.tasks' = '1','compaction.schedule.enabled' = 'true', -- 生成压缩 plan'compaction.trigger.strategy' = 'num_commits', -- 压缩策略,安装commit次数进行压缩'compaction.delta_commits' = '2', -- 2次进行压缩'table.type' = 'MERGE_ON_READ');set table.dynamic-table-options.enabled=true;insert into t3select * from sourceT/*+ OPTIONS('rows-per-second' = '5') */;-- 从hdfs上可以看到,flink发生两次ck,delta_commit提交两次后,将log文件进行压缩,然后生成了parquet文件。

5.4.4 文件大小

​ Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用。在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小。

​ 目前只有 log 文件的写入大小可以做到精确控制,parquet 文件大小按照估算值。

名称说明默认值备注
hoodie.parquet.max.file.size最大可写入的 parquet 文件大小120 * 1024 * 1024默认 120MB(单位 byte)超过该大小切新的 file group
hoodie.logfile.to.parquet.compression.ratiolog文件大小转 parquet 的比率0.35hoodie 统一依据 parquet 大小来评估小文件策略
hoodie.parquet.small.file.limit在写入时,hudi 会尝试先追加写已存小文件,该参数设置了小文件的大小阈值,小于该参数的文件被认为是小文件104857600默认 100MB(单位 byte)大于 100MB,小于 120MB 的文件会被忽略,避免写过度放大
hoodie.copyonwrite.record.size.estimate预估的 record 大小,hoodie 会依据历史的 commits 动态估算 record 的大小,但是前提是之前有单次写入超过 hoodie.parquet.small.file.limit 大小,在未达到这个大小时会使用这个参数1024默认 1KB(单位 byte)如果作业流量比较小,可以设置下这个参数
hoodie.logfile.max.sizeLogFile最大大小。这是在将Log滚转到下一个版本之前允许的最大大小。1073741824默认1GB(单位 byte)

案例如下:

CREATE TABLE t4(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20))WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t4','compaction.tasks' = '1','hoodie.parquet.max.file.size'= '10000', -- 最大可写入的 parquet 文件大小,设置为10 KB'hoodie.parquet.small.file.limit'='5000', -- 小文件的大小阈值,小于该参数的文件被认为是小文件 设置为5KB'table.type' = 'MERGE_ON_READ');set table.dynamic-table-options.enabled=true;insert into t4select * from sourceT /*+ OPTIONS('rows-per-second' = '5')*/;

5.4.5 hadoop参数

从 0.12.0 开始支持,如果有跨集群提交执行的需求,可以通过 sql 的 ddl 指定 per-job 级别的 hadoop 配置

名称说明默认值备注
hadoop.${you option key}通过 hadoop.前缀指定 hadoop 配置项支持同时指定多个 hadoop 配置项

5.5 内存优化

5.5.1 内存参数

名称说明默认值备注
write.task.max.size一个 write task 的最大可用内存1024当前预留给 write buffer 的内存为write.task.max.size -compaction.max_memory当 write task 的内存 buffer达到阈值后会将内存里最大的 buffer flush 出去
write.batch.sizeFlink 的写 task 为了提高写数据效率,会按照写 bucket 提前 buffer 数据,每个 bucket 的数据在内存达到阈值之前会一直 cache 在内存中,当阈值达到会把数据 buffer 传递给 hoodie 的 writer 执行写操作256一般不用设置,保持默认值就好
write.log_block.sizehoodie 的 log writer 在收到 write task 的数据后不会马上 flush 数据,writer 是以 LogBlock 为单位往磁盘刷数据的,在 LogBlock 攒够之前 records 会以序列化字节的形式 buffer 在 writer 内部128一般不用设置,保持默认值就好
write.merge.max_memoryhoodie 在 COW 写操作的时候,会有增量数据和 base file 数据 merge 的过程,增量的数据会缓存在内存的 map 结构里,这个 map 是可 spill 的,这个参数控制了 map 可以使用的堆内存大小100一般不用设置,保持默认值就好
compaction.max_memory同 write.merge.max_memory: 100MB 类似,只是发生在压缩时。100如果是 online compaction,资源充足时可以开大些,比如 1GB

5.5.2 MOR

(1)state backend 换成 rocksdb (默认的 in-memory state-backend 非常吃内存)(2)内存够的话,compaction.max_memory 调大些 (默认是 100MB 可以调到 1GB)(3)关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小,比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 write function 能分到 2GB,尽量预留一些 buffer,因为网络 buffer,TM 上其他类型 task (比如 BucketAssignFunction 也会吃些内存)(4)需要关注 compaction 的内存变化,compaction.max_memory 控制了每个 compaction task 读 log 时可以利用的内存大小,compaction.tasks 控制了 compaction task 的并发注意: write.task.max.size - compaction.max_memory 是预留给每个 write task 的内存 buffer

5.5.3 COW

(1)state backend 换成 rocksdb(默认的 in-memory state-backend 非常吃内存)。(2)write.task.max.size 和 write.merge.max_memory 同时调大(默认是 1GB 和 100MB 可以调到 2GB 和 1GB)。(3)关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小,比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 write function 能分到 2GB,尽量预留一些 buffer,因为网络 buffer,TM 上其他类型 task(比如 BucketAssignFunction 也会吃些内存)。注意:write.task.max.size - write.merge.max_memory 是预留给每个 write task 的内存 buffer。

5.6 读取方式

5.6.1 流读

​ 当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

名称Required默认值说明
read.streaming.enabledfalsefalse设置 true 开启流读模式
read.start-commitfalse最新 commit指定 ‘yyyyMMddHHmmss’ 格式的起始 commit(闭区间)
read.streaming.skip_compactionfalsefalse流读时是否跳过 compaction 的 commits,跳过 compaction 有两个用途:1)避免 upsert 语义下重复消费 (compaction 的 instant 为重复数据,如果不跳过,有小概率会重复消费) 2) changelog 模式下保证语义正确性 0.11 开始,以上两个问题已经通过保留 compaction 的 instant time 修复
clean.retain_commitsfalse10cleaner 最多保留的历史 commits 数,大于此数量的历史 commits 会被清理掉,changelog 模式下,这个参数可以控制 changelog 的保留时间,例如 checkpoint 周期为 5 分钟一次,默认最少保留 50 分钟的时间。
set sql-client.execution.result-mode=tableau;CREATE TABLE t5(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t5','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4' -- 默认60s);insert into t5 select * from sourceT;select * from t5;-- 如下图,能够不断的获取数据

5.6.2 增量读取

0.10.0 开始支持。如果有增量读取 batch 数据的需求,增量读取包含三种场景。(1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;(2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit(3)TimeTravel:Batch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)
名称Required默认值说明
read.start-commitfalse默认从最新 commit支持 earliest 从最早消费
read.end-commitfalse默认到最新 commit

5.6.3 限流

​ 如果将全量数据(百亿数量级) 和增量先同步到 kafka,再通过 flink 流式消费的方式将库表数据直接导成 hoodie 表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的 partition 随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。

名称Required默认值说明
write.rate.limitfalse0默认关闭限速

5.7 写入方式

5.7.1 通过flink-cdc进行写入

CDC 数据保存了完整的数据库变更,当前可通过两种途径将数据导入 hudi

第一种:通过 cdc-connector 直接对接 DB 的 binlog 将数据导入 hudi,优点是不依赖消息队列,缺点是对 db server 造成压力。第二种:对接 cdc format 消费 kafka 数据导入 hudi,优点是可扩展性强,缺点是依赖 kafka。注意:如果上游数据无法保证顺序,需要指定 write.precombine.field 字段。

1)准备MySQL表

(1)MySQL开启binlog

(2)建表

create database test;use test;create table stu3 ( id int unsigned auto_increment primary key COMMENT '自增id', name varchar(20) not null comment '学生名字', school varchar(20) not null comment '学校名字', nickname varchar(20) not null comment '学生小名', age int not null comment '学生年龄', class_num int not null comment '班级人数', phone bigint not null comment '电话号码', email varchar(64) comment '家庭网络邮箱', ip varchar(32) comment 'IP地址' ) engine=InnoDB default charset=utf8;

2)flink读取mysql binlog并写入kafka

(1)创建MySQL表

create table stu3_binlog( id bigint not null, name string, school string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced) with ( 'connector' = 'mysql-cdc', 'hostname' = 'centos01', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'stu3');

(2)创建Kafka表

create table stu3_binlog_sink_kafka( id bigint not null, name string, school string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced) with ( 'connector' = 'upsert-kafka' ,'topic' = 'cdc_mysql_stu3_sink' ,'properties.zookeeper.connect' = 'centos01:2181' ,'properties.bootstrap.servers' = 'centos01:9092' ,'key.format' = 'json' ,'value.format' = 'json');

(3)将mysql binlog日志写入kafka

insert into stu3_binlog_sink_kafkaselect * from stu3_binlog;

3)flink读取kafka数据并写入hudi数据湖

(1)创建kafka源表

create table stu3_binlog_source_kafka( id bigint not null, name string, school string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string ) with ( 'connector' = 'kafka', 'topic' = 'cdc_mysql_stu3_sink', 'properties.bootstrap.servers' = 'hadoop1:9092', 'format' = 'json', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id' = 'testGroup' );

(2)创建hudi目标表

create table stu3_binlog_sink_hudi( id bigint not null, name string, `school` string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced)partitioned by (`school`)with ( 'connector' = 'hudi', 'path' = 'hdfs://centos04:9000/tmp/hudi_flink/stu3_binlog_sink_hudi', 'table.type' = 'MERGE_ON_READ', 'write.option' = 'insert', 'write.precombine.field' = 'school' );

(3)将kafka数据写入到hudi中

insert into stu3_binlog_sink_hudiselect * fromstu3_binlog_source_kafka;

5.7.2 离线批量导入

如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。(1)批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。(2)bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。SET execution.runtime-mode = batch; SET execution.checkpointing.interval = 0;(3)bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。
名称Required默认值说明
write.operationTRUEupsert配置 bulk_insert 开启该功能
write.tasksFALSE4bulk_insert 写 task 的并发,最后的文件数 >=write.tasks
write.bulk_insert.shuffle_by_partitionwrite.bulk_insert.shuffle_input(从 0.11 开始)FALSETRUE是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险
write.bulk_insert.sort_by_partitionwrite.bulk_insert.sort_input(从 0.11 开始)FALSETRUE是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量
write.sort.memory128sort 算子的可用 managed memory(单位 MB)

5.7.3 全量接增量

如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。

如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)

名称Required默认值说明
index.bootstrap.enabledtruefalse开启索引加载,会将已存表的最新数据一次性加载到 state 中
index.partition.regexfalse*设置正则表达式进行分区筛选,默认为加载全部分区
使用流程(1) CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确(2)设置 index.bootstrap.enabled = true开启索引加载功能(3)重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle说明:索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索finish loading the index under partition 和 Load records from file 日志来观察索引加载的进度

5.8 写入模式

5.8.1 Changelog 模式

​ 如果希望 Hoodie 保留消息的所有变更(I/-U/U/D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie 的 MOR 表通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录

1)WITH 参数

名称Required默认值说明
changelog.enabledfalsefalse默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更。

​ 批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。

​ 开启 changelog.enabled 参数后,中间的变更也只是 Best Effort: 异步的压缩任务会将中间变更合并成 1 条,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的 buffer 时间可以预留一定的时间 buffer 给 reader,比如调整压缩的两个参数:

​ Ø compaction.delta_commits:5

​ Ø compaction.delta_seconds: 3600

说明:

Changelog 模式开启流读的话,要在 sql-client 里面设置参数:

set sql-client.execution.result-mode=tableau;

或者

set sql-client.execution.result-mode=changelog;

否则中间结果在读的时候会被直接合并。

2)流读 changelog

仅在 0.10.0 支持,本 feature 为实验性。

开启 changelog 模式后,hudi 会保留一段时间的 changelog 供下游 consumer 消费,我们可以通过流读 ODS 层 changelog 接上 ETL 逻辑写入到 DWD 层,如下图的 pipeline:

​ 流读的时候我们要注意 changelog 有可能会被 compaction 合并掉,中间记录会消除,可能会影响计算结果,需要关注sql-client的属性(result-mode)同上。

3)案例演示

(1)使用changelog

set sql-client.execution.result-mode=tableau; CREATE TABLE t6(id int,ts int,primary key (id) not enforced) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '4','changelog.enabled' = 'true');insert into t6 values (1,1);insert into t6 values (1,2);set table.dynamic-table-options.enabled=true;select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/;select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

(2)不使用changelog

CREATE TABLE t6_v(id int,ts int,primary key (id) not enforced) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4');select * from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;select count(*) from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;

5.8.2 Append 模式

从 0.10 开始支持

对于 INSERT 模式:

​ Ø MOR 默认会 apply 小文件策略: 会追加写 avro log 文件

​ Ø COW 每次直接写新的 parquet 文件,没有小文件策略

Hudi 支持丰富的 Clustering 策略,优化 INSERT 模式下的小文件问题:

1)Inline Clustering

只有 Copy On Write 表支持该模式

名称Required默认值说明
write.insert.clusterfalsefalse是否在写入时合并小文件,COW 表默认 insert 写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件(不会去重),吞吐会受影响

2) Async Clustering

​ 从 0.12 开始支持

(1)WITH参数

名称Required默认值说明
clustering.schedule.enabledfalsefalse是否在写入时定时异步调度 clustering plan,默认关闭
clustering.delta_commitsfalse4调度 clsutering plan 的间隔 commits,clustering.schedule.enabled 为 true 时生效
clustering.async.enabledfalsefalse是否异步执行 clustering plan,默认关闭
clustering.tasksfalse4Clustering task 执行并发
clustering.plan.strategy.target.file.max.bytesfalse1024 * 1024 * 1024Clustering 单文件目标大小,默认 1GB
clustering.plan.strategy.small.file.limitfalse600小于该大小的文件才会参与 clustering,默认600MB
clustering.plan.strategy.sort.columnsfalseN/A支持指定特殊的排序字段
clustering.plan.partition.filter.modefalseNONE支持NONE:不做限制RECENT_DAYS:按时间(天)回溯SELECTED_PARTITIONS:指定固定的 partition
clustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默认 2 天

(2)Clustering Plan Strategy

​ 支持定制化的 clustering 策略。

名称Required默认值说明
clustering.plan.partition.filter.modefalseNONE支持· NONE:不做限制· RECENT_DAYS:按时间(天)回溯· SELECTED_PARTITIONS:指定固定的 partition
clustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默认 2 天
clustering.plan.strategy.cluster.begin.partitionfalseN/ASELECTED_PARTITIONS 生效,指定开始 partition(inclusive)
clustering.plan.strategy.cluster.end.partitionfalseN/ASELECTED_PARTITIONS 生效,指定结束 partition(incluseve)
clustering.plan.strategy.partition.regex.patternfalseN/A正则表达式过滤 partitions
clustering.plan.strategy.partition.selectedfalseN/A显示指定目标 partitions,支持逗号 , 分割多个 partition

5.9 Bucket索引

​ 默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当数据量比较大的时候,state的存储开销可能成为瓶颈,bucket 索引通过固定的 hash 策略,将相同 key 的数据分配到同一个 fileGroup 中,避免了索引的存储和查询开销。

名称Required默认值说明
index.typefalseFLINK_STATE设置 BUCKET 开启 Bucket 索引功能
hoodie.bucket.index.hash.fieldfalse主键可以设置成主键的子集
hoodie.bucket.index.num.bucketsfalse4默认每个 partition 的 bucket 数,当前设置后则不可再变更。
(1)bucket index 没有 state 的存储计算开销,性能较好(2)bucket index 无法扩 buckets,state index 则可以依据文件的大小动态扩容(3)bucket index 不支持跨 partition 的变更(如果输入是 cdc 流则没有这个限制),state index 没有限制

5.10 Hudi Catalog

​ 从 0.12.0 开始支持,通过 catalog 可以管理 flink 创建的表,避免重复建表操作,另外 hms 模式的 catalog 支持自动补全 hive 同步参数。

-- DFS 模式 Catalog SQL样例:CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默认路径}','mode'='dfs' );-- Hms 模式 Catalog SQL 样例:CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默认路径}','hive.conf.dir' = '${hive-site.xml 所在的目录}','mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性);
名称Required默认值说明
catalog.pathtrue默认的 catalog 根路径,用作表路径的自动推导,默认的表路径: c a t a l o g . p a t h/{catalog.path}/ catalog.path/{db_name}/${table_name}
default-databasefalsedefault默认的 database 名
hive.conf.dirfalsehive-site.xml 所在的目录,只在 hms 模式下生效
modefalsedfs支持 hms模式通过 hive 管理元数据
table.externalfalsefalse是否创建外部表,只在 hms 模式下生效

案例如下:

--(1)创建sql-client初始化sql文件vim /opt/apps/flink-1.13.6/conf/sql-client-init.sqlCREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '/tmp/hudi_catalog','mode'='dfs' );USE CATALOG hoodie_catalog;--(2)指定sql-client启动时加载sql文件hadoop fs -mkdir /tmp/hudi_catalogbin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session--(3)建库建表插入create database test;use test;create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20),primary key (uuid) not enforced)with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t2','table.type' = 'MERGE_ON_READ');insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a');--(4)退出sql-client,重新进入,表信息还在use test;show tables;select * from t2;

5.11 离线压缩

MOR 表的 compaction 默认是自动打开的,策略是 5 个 commits 执行一次压缩。 因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大的时候(10w+/s qps),容易干扰写流程,此时采用离线定时任务的方式执行 compaction 任务更稳定

5.11.1 设置参数

Ø compaction.async.enabled 为 false,关闭在线 compaction

Ø compaction.schedule.enabled 仍然保持开启,由写任务阶段性触发压缩 plan

5.11.2 原理

一个 compaction 的任务的执行包括两部分:

Ø schedule 压缩 plan

该过程推荐由写任务定时触发,写参数 compaction.schedule.enabled 默认开启

Ø 执行对应的压缩 plan

5.11.3 使用方式

1)执行命令

离线 compaction 需要手动执行 Java 程序,程序入口:

Ø hudi-flink1.13-bundle-0.12.0.jar

Ø org.apache.hudi.sink.compact.HoodieFlinkCompactor

# 命令行的方式./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:9000/table

2)参数配置

参数名required默认值备注
–pathtrue目标表的路径
–compaction-tasksfalse-1压缩 task 的并发,默认是待压缩 file group 的数量
–compaction-max-memoryfalse100 (单位 MB)压缩时 log 数据的索引 map,默认 100MB,内存足够可以开大些
–schedulefalsefalse是否要执行 schedule compaction 的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的 compaction plan 默认是一直 schedule 的,除非手动关闭(默认 5 个 commits 一次压缩)
–seqfalseLIFO执行压缩任务的顺序,默认是从最新的压缩 plan 开始执行,可选值:LIFO: 从最新的 plan 开始执行;FIFO: 从最老的 plan 开始执行
–servicefalsefalse是否开启 service 模式,service 模式会打开常驻进程,一直监听压缩任务并提交到集群执行(从 0.11 开始执行)
–min-compaction-interval-secondsfalse600 (单位 秒)service 模式下的执行间隔,默认 10 分钟

案例如下

create table t7(id int,ts int,primary key (id) not enforced)with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t7','compaction.async.enabled' = 'false', -- 关闭自动压缩'compaction.schedule.enabled' = 'true', -- 由写任务阶段性触发压缩 plan'table.type' = 'MERGE_ON_READ');insert into t7 values(1,1);insert into t7 values(2,2);insert into t7 values(3,3);insert into t7 values(4,4);insert into t7 values(5,5);// 命令行的方式./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t7

5.12 离线 Clustering

​ 异步的 clustering 相对于 online 的 async clustering 资源隔离,从而更加稳定。

5.12.1 设置参数

Ø clustering.async.enabled 为 false,关闭在线 clustering。

Ø clustering.schedule.enabled 仍然保持开启,由写任务阶段性触发 clustering plan。

5.12.2 原理

一个 clustering 的任务的执行包括两部分:

Ø schedule plan

推荐由写任务定时触发,写参数 clustering.schedule.enabled 默认开启。

Ø 执行对应的 plan

5.12.3 使用方式

1)执行命令

离线 clustering 需要手动执行 Java 程序,程序入口:

Ø hudi-flink1.13-bundle-0.12.0.jar

Ø org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob

注意:必须是分区表,否则报错空指针异常。

# 命令行的方式./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/table

2)参数配置

参数名required默认值备注
–pathtrue目标表的路径。
–clustering-tasksfalse-1Clustering task 的并发,默认是待压缩 file group 的数量。
–schedulefalsefalse是否要执行 schedule clustering plan 的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的 clustering plan 默认是一直 schedule 的,除非手动关闭(默认 4 个 commits 一次 clustering)。
–seqfalseFIFO执行压缩任务的顺序,默认是从最老的 clustering plan 开始执行,可选值:LIFO: 从最新的 plan 开始执行;FIFO: 从最老的 plan 开始执行
–target-file-max-bytesfalse1024 * 1024 * 1024最大目标文件,默认 1GB。
–small-file-limitfalse600小于该大小的文件会参与 clustering,默认 600MB。
–sort-columnsfalseN/AClustering 可选排序列。
–servicefalsefalse是否开启 service 模式,service 模式会打开常驻进程,一直监听压缩任务并提交到集群执行(从 0.11 开始执行)。
–min-compaction-interval-secondsfalse600 (单位 秒)service 模式下的执行间隔,默认 10 分钟。

3)案例演示

create table t8(id int,age int,ts int,primary key (id) not enforced) partitioned by (age)with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t8','clustering.async.enabled' = 'false','clustering.schedule.enabled' = 'true','table.type' = 'COPY_ON_WRITE');insert into t8 values(1,18,1);insert into t8 values(2,18,2);insert into t8 values(3,18,3);insert into t8 values(4,18,4);insert into t8 values(5,18,5);-- 命令行的方式./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t8

5.12.4 常见问题

# 存储一直看不到数据如果是 streaming 写,请确保开启 checkpoint,Flink 的 writer 有 3 种刷数据到磁盘的策略:当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB)当总的 buffer 大小积攒到一定大小(可配,默认 1GB)当 checkpoint 触发,将内存里的数据全部 flush 出去# 数据有重复如果是 COW 写,需要开启参数 write.insert.drop.duplicates,COW 写每个 bucket 的第一个文件默认是不去重的,只有增量的数据会去重,全局去重需要开启该参数;MOR 写不需要开启任何参数,定义好 primary key 后默认全局去重。(注意:从 0.10 版本开始,该属性改名 write.precombine 并且默认为 true。)如果需要多 partition 去重,需要开启参数: index.global.enabled 为 true。(注意:从 0.10 版本开始,该属性默认true。)索引 index 是判断数据重复的核心数据结构,index.state.ttl 设置了索引保存的时间,默认为 1.5 天,对于长时间周期的更新,比如更新一个月前的数据,需要将 index.state.ttl 调大(单位天),设置小于 0 代表永久保存。(注意:从 0.10 版本开始,该属性默认为 0。)# Merge On Read 写只有 log 文件Merge On Read 默认开启了异步的 compaction,策略是 5 个 commits 压缩一次,当条件满足参会触发压缩任务,另外,压缩本身因为耗费资源,所以不一定能跟上写入效率,可能会有滞后。

5.13 Hudi核心原理

5.13.1 Hudi数据去重原理

Hoodie 的数据去重分两步:

(1)写入前攒 buffer 阶段去重,核心接口HoodieRecordPayload#preCombine

(2)写入过程中去重,核心接口HoodieRecordPayload#combineAndGetUpdateValue。

1)消息版本新旧

​ 相同 record key (主键)的数据通过write.precombine.field指定的字段来判断哪个更新,即 precombine 字段更大的 record 更新,如果是相等的 precombine 字段,则后来的数据更新。

​ 从 0.10 版本开始,write.precombine.field 字段为可选,如果没有指定,会看 schema 中是否有 ts 字段,如果有,ts 字段被选为 precombine 字段;如果没有指定,schema 中也没有 ts 字段,则为处理顺序:后来的消息默认较新

2)攒消息阶段的去重

​ Hoodie 将 buffer 消息发给 write handle 之前可以执行一次去重操作,通过HoodieRecordPayload#preCombine 接口,保留 precombine 字段较大的消息,此操作为纯内存的计算,在同一个 write task 中为单并发执行。

​ 注意:write.precombine 选项控制了攒消息的去重。

3)写 parquet 增量消息的去重

​ 在Hoodie 写入流程中,Hoodie 每写一个 parquet 都会有 base + 增量 merge 的过程,增量的消息会先放到一个 spillable map 的数据结构构建内存 index,这里的增量数据如果没有提前去重,那么同 key 的后来消息会直接覆盖先来的消息。

​ Writer 接着扫 base 文件,过程中会不断查看内存 index 是否有同 key 的新消息,如果有,会走 HoodieRecordPayload#combineAndGetUpdateValue 接口判断保留哪个消息。

​ 注意: MOR 表的 compaction 阶段和 COW 表的写入流程都会有 parquet 增量消息去重的逻辑。

4)跨 partition 的消息去重

​ 默认情况下,不同的 partition 的消息是不去重的,即相同的 key 消息,如果新消息换了 partition,那么老的 partiiton 消息仍然保留。

​ 开启 index.global.enabled 选项开启跨 partition 去重,原理是先往老的 partiton 发一条删除消息,再写新 partition。

5.13.2 Hudi表写入原理

数据写入、数据压缩与数据清理

1)数据写入分析(1)基础数据封装:将数据流中flink的RowData封装成Hoodie实体;(2)BucketAssigner:桶分配器,主要是给数据分配写入的文件地址:若为插入操作,则取大小最小的FileGroup对应的FileId文件内进行插入;在此文件的后续写入中文件 ID 保持不变,并且提交时间会更新以显示最新版本。这也意味着记录的任何特定版本,给定其分区路径,都可以使用文件 ID 和 instantTime进行唯一定位;若为更新操作,则直接在当前location进行数据更新;(3)Hoodie Stream Writer: 数据写入,将数据缓存起来,在超过设置的最大flushSize或是做checkpoint时进行刷新到文件中;(4)Oprator Coordinator:主要与Hoodie Stream Writer进行交互,处理checkpoint等事件,在做checkpoint时,提交instant到timeLine上,并生成下一个instant的时间,算法为取当前最新的commi时间,比对当前时间与commit时间,若当前时间大于commit时间,则返回,否则一直循环等待生成。2)数据压缩压缩(compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。compaction首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件进行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance:具体策略分为4种,具体见官网说明:compaction.trigger.strategy:Strategy to trigger compaction, options are 1.'num_commits': trigger compaction when reach N delta commits; 2.'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction; 3.'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 4.'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits'Default Value: num_commits (Optional)在项目实践中需要注意参数'read.streaming.skip_compaction' 参数的配置,其表示在流式读取该表是否跳过压缩后的数据,若该表用于后续聚合操作表的输入表,则需要配置值为true,表示聚合操作表不再消费读取压缩数据。若不配置或配置为false,则该表中的数据在未被压缩之前被聚合操作表读取了一次,在压缩后数据又被读取一次,会导致聚合表的sum、count等算子结果出现双倍情况。3)数据清理随着用户向表中写入更多数据,对于每次更新,Hudi会生成一个新版本的数据文件用于保存更新后的记录(COPY_ON_WRITE)或将这些增量更新写入日志文件以避免重写更新版本的数据文件(MERGE_ON_READ)。在这种情况下,根据更新频率,文件版本数可能会无限增长,但如果不需要保留无限的历史记录,则必须有一个流程(服务)来回收旧版本的数据,这就是 Hudi 的清理服务。具体清理策略可参考官网,一般使用的清理策略为:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 个文件版本而不受时间限制的效果。会删除N之外的FileSlice。

5.13.3 Hudi表读取原理

(1)开启split_monitor算子,每隔N秒(可配置)监听TimeLine上变化,并将变更的Instance封装为FileSlice。

(2)分发log文件时候,按照fileId值进行keyBy,保证同一file group下数据文件都给一个Task进行处理,从而保证数据处理的有序性。

(3)split_reader根据FileSlice信息进行数据读取。